v2 is comming
diff --git a/wally/pretty_yaml.py b/wally/pretty_yaml.py
index 2cb5607..afb6375 100644
--- a/wally/pretty_yaml.py
+++ b/wally/pretty_yaml.py
@@ -89,7 +89,8 @@
if len(val_res) == 1 and \
len(key_str + val_res[0]) < width and \
- not isinstance(v, dict):
+ not isinstance(v, dict) and \
+ not val_res[0].strip().startswith('-'):
res.append(key_str + val_res[0])
else:
res.append(key_str)
diff --git a/wally/report.py b/wally/report.py
index e893c25..16c487d 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -15,7 +15,7 @@
import wally
from wally.utils import ssize2b
-from wally.statistic import round_3_digit, data_property
+from wally.statistic import round_3_digit
from wally.suits.io.fio_task_parser import (get_test_sync_mode,
get_test_summary,
parse_all_in_1,
@@ -86,83 +86,11 @@
name_map = collections.defaultdict(lambda: [])
for data in test_data:
- name_map[(data.config.name, data.summary())].append(data)
+ name_map[(data.name, data.summary())].append(data)
return name_map
-def get_lat_perc_50_95(lat_mks):
- curr_perc = 0
- perc_50 = None
- perc_95 = None
- pkey = None
- for key, val in sorted(lat_mks.items()):
- if curr_perc + val >= 50 and perc_50 is None:
- if pkey is None or val < 1.:
- perc_50 = key
- else:
- perc_50 = (50. - curr_perc) / val * (key - pkey) + pkey
-
- if curr_perc + val >= 95:
- if pkey is None or val < 1.:
- perc_95 = key
- else:
- perc_95 = (95. - curr_perc) / val * (key - pkey) + pkey
- break
-
- pkey = key
- curr_perc += val
-
- return perc_50 / 1000., perc_95 / 1000.
-
-
-def process_disk_info(test_data):
-
- name_map = group_by_name(test_data)
- data = {}
- for (name, summary), results in name_map.items():
- lat_mks = collections.defaultdict(lambda: 0)
- num_res = 0
-
- for result in results:
- num_res += len(result.raw_result['jobs'])
- for job_info in result.raw_result['jobs']:
- for k, v in job_info['latency_ms'].items():
- if isinstance(k, basestring) and k.startswith('>='):
- lat_mks[int(k[2:]) * 1000] += v
- else:
- lat_mks[int(k) * 1000] += v
-
- for k, v in job_info['latency_us'].items():
- lat_mks[int(k)] += v
-
- for k, v in lat_mks.items():
- lat_mks[k] = float(v) / num_res
-
- testnodes_count_set = set(dt.vm_count for dt in results)
-
- assert len(testnodes_count_set) == 1
- testnodes_count, = testnodes_count_set
- assert len(results) % testnodes_count == 0
-
- intervals = [result.run_interval for result in results]
- p = results[0].config
- pinfo = PerfInfo(p.name, result.summary(), intervals,
- p, testnodes_count)
-
- pinfo.raw_bw = [result.results['bw'] for result in results]
- pinfo.raw_iops = [result.results['iops'] for result in results]
- pinfo.raw_lat = [result.results['lat'] for result in results]
-
- pinfo.bw = data_property(map(sum, zip(*pinfo.raw_bw)))
- pinfo.iops = data_property(map(sum, zip(*pinfo.raw_iops)))
- pinfo.lat = data_property(sum(pinfo.raw_lat, []))
- pinfo.lat_50, pinfo.lat_95 = get_lat_perc_50_95(lat_mks)
-
- data[(p.name, summary)] = pinfo
- return data
-
-
def report(name, required_fields):
def closure(func):
report_funcs.append((required_fields.split(","), name, func))
@@ -469,12 +397,17 @@
def make_plots(processed_results, plots):
+ """
+ processed_results: [PerfInfo]
+ plots = [(test_name_prefix:str, fname:str, description:str)]
+ """
files = {}
for name_pref, fname, desc in plots:
chart_data = []
- for res in processed_results.values():
- if res.name.startswith(name_pref):
+ for res in processed_results:
+ summ = res.name + "_" + res.summary
+ if summ.startswith(name_pref):
chart_data.append(res)
if len(chart_data) == 0:
@@ -482,12 +415,8 @@
use_bw = ssize2b(chart_data[0].p.blocksize) > 16 * 1024
- chart_data.sort(key=lambda x: x.concurence)
+ chart_data.sort(key=lambda x: x.params['vals']['numjobs'])
- # if x.lat.average < max_lat]
- # lat = [x.lat.average / 1000 for x in chart_data]
- # lat_min = [x.lat.min / 1000 for x in chart_data]
- # lat_max = [x.lat.max / 1000 for x in chart_data]
lat = None
lat_min = None
lat_max = None
@@ -509,10 +438,18 @@
fc = io_chart(title=desc,
concurence=concurence,
- latv=lat, latv_min=lat_min, latv_max=lat_max,
+
+ latv=lat,
+ latv_min=lat_min,
+ latv_max=lat_max,
+
iops_or_bw=data,
iops_or_bw_err=data_dev,
- legend=name, latv_50=lat_50, latv_95=lat_95)
+
+ legend=name,
+
+ latv_50=lat_50,
+ latv_95=lat_95)
files[fname] = fc
return files
@@ -521,7 +458,7 @@
def find_max_where(processed_results, sync_mode, blocksize, rw, iops=True):
result = None
attr = 'iops' if iops else 'bw'
- for measurement in processed_results.values():
+ for measurement in processed_results:
ok = measurement.sync_mode == sync_mode
ok = ok and (measurement.p.blocksize == blocksize)
ok = ok and (measurement.p.rw == rw)
@@ -568,7 +505,7 @@
'd', '1m', 'read', False)
rws4k_iops_lat_th = []
- for res in processed_results.values():
+ for res in processed_results:
if res.sync_mode in 'xs' and res.p.blocksize == '4k':
if res.p.rw != 'randwrite':
continue
@@ -636,8 +573,9 @@
('hdd_rrd4k', 'rand_read_4k', 'Random read 4k direct IOPS'),
('hdd_rwx4k', 'rand_write_4k', 'Random write 4k sync IOPS')
]
- images = make_plots(processed_results, plots)
- di = get_disk_info(processed_results)
+ perf_infos = [res.disk_perf_info() for res in processed_results]
+ images = make_plots(perf_infos, plots)
+ di = get_disk_info(perf_infos)
return render_all_html(comment, di, lab_info, images, "report_hdd.html")
@@ -647,15 +585,16 @@
('cinder_iscsi_rrd4k', 'rand_read_4k', 'Random read 4k direct IOPS'),
('cinder_iscsi_rwx4k', 'rand_write_4k', 'Random write 4k sync IOPS')
]
+ perf_infos = [res.disk_perf_info() for res in processed_results]
try:
- images = make_plots(processed_results, plots)
+ images = make_plots(perf_infos, plots)
except ValueError:
plots = [
('cinder_iscsi_rrd4k', 'rand_read_4k', 'Random read 4k direct IOPS'),
('cinder_iscsi_rws4k', 'rand_write_4k', 'Random write 4k sync IOPS')
]
- images = make_plots(processed_results, plots)
- di = get_disk_info(processed_results)
+ images = make_plots(perf_infos, plots)
+ di = get_disk_info(perf_infos)
return render_all_html(comment, di, lab_info, images, "report_cinder_iscsi.html")
@@ -669,8 +608,9 @@
'Random write 16m direct MiBps'),
]
- images = make_plots(processed_results, plots)
- di = get_disk_info(processed_results)
+ perf_infos = [res.disk_perf_info() for res in processed_results]
+ images = make_plots(perf_infos, plots)
+ di = get_disk_info(perf_infos)
return render_all_html(comment, di, lab_info, images, "report_ceph.html")
@@ -679,9 +619,12 @@
#
# IOPS(X% read) = 100 / ( X / IOPS_W + (100 - X) / IOPS_R )
#
- is_ssd = True
+
+ perf_infos = [res.disk_perf_info() for res in processed_results]
mixed = collections.defaultdict(lambda: [])
- for res in processed_results.values():
+
+ is_ssd = False
+ for res in perf_infos:
if res.name.startswith('mixed'):
if res.name.startswith('mixed-ssd'):
is_ssd = True
@@ -790,7 +733,7 @@
}
try:
- res_fields = sorted(v.name for v in dinfo.values())
+ res_fields = sorted(v.name for v in dinfo)
found = False
for fields, name, func in report_funcs:
diff --git a/wally/run_test.py b/wally/run_test.py
index d7e803a..0dab8c7 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -528,8 +528,7 @@
with open(text_rep_fname, "w") as fd:
for tp, data in ctx.results.items():
if 'io' == tp and data is not None:
- dinfo = report.process_disk_info(data)
- rep = IOPerfTest.format_for_console(data, dinfo)
+ rep = IOPerfTest.format_for_console(data)
elif tp in ['mysql', 'pgbench'] and data is not None:
rep = MysqlTest.format_for_console(data)
else:
@@ -571,8 +570,7 @@
"report, except first are skipped")
continue
found = True
- dinfo = report.process_disk_info(data)
- report.make_io_report(dinfo,
+ report.make_io_report(data,
cfg.get('comment', ''),
html_rep_fname,
lab_info=ctx.hw_info)
@@ -798,7 +796,6 @@
for stage in stages:
ok = False
with log_stage(stage):
- logger.info("Start " + get_stage_name(stage))
stage(cfg_dict, ctx)
ok = True
if not ok:
diff --git a/wally/statistic.py b/wally/statistic.py
index 74ce572..fb05acb 100644
--- a/wally/statistic.py
+++ b/wally/statistic.py
@@ -11,6 +11,10 @@
no_numpy = True
+def average(data):
+ return sum(data) / len(data)
+
+
def med_dev(vals):
med = sum(vals) / len(vals)
dev = ((sum(abs(med - i) ** 2.0 for i in vals) / len(vals)) ** 0.5)
diff --git a/wally/suits/io/ceph.cfg b/wally/suits/io/ceph.cfg
index 4a651e6..fc96a52 100644
--- a/wally/suits/io/ceph.cfg
+++ b/wally/suits/io/ceph.cfg
@@ -5,7 +5,7 @@
NUMJOBS_SHORT={% 1, 2, 3, 10 %}
ramp_time=15
-runtime=60
+runtime=120
# ---------------------------------------------------------------------
# check different thread count, sync mode. (latency, iops) = func(th_count)
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
index c650069..2a76fc2 100644
--- a/wally/suits/io/fio.py
+++ b/wally/suits/io/fio.py
@@ -15,13 +15,14 @@
from concurrent.futures import ThreadPoolExecutor
from wally.pretty_yaml import dumps
-from wally.statistic import round_3_digit, data_property
+from wally.statistic import round_3_digit, data_property, average
from wally.utils import ssize2b, sec_to_str, StopTestError, Barrier, get_os
from wally.ssh_utils import (save_to_remote, read_from_remote, BGSSHTask, reconnect)
from .fio_task_parser import (execution_time, fio_cfg_compile,
- get_test_summary, get_test_sync_mode)
-from ..itest import TimeSeriesValue, PerfTest, TestResults, run_on_node
+ get_test_summary, get_test_sync_mode, FioJobSection)
+from ..itest import (TimeSeriesValue, PerfTest, TestResults,
+ run_on_node, TestConfig, MeasurementMatrix)
logger = logging.getLogger("wally")
@@ -52,7 +53,12 @@
def load_fio_log_file(fname):
with open(fname) as fd:
it = [ln.split(',')[:2] for ln in fd]
- vals = [(float(off) / 1000, float(val.strip())) for off, val in it]
+
+ vals = [(float(off) / 1000, # convert us to ms
+ float(val.strip()) + 0.5) # add 0.5 to compemsate average value
+ # as fio trimm all values in log to integer
+ for off, val in it]
+
return TimeSeriesValue(vals)
@@ -63,9 +69,9 @@
fn = os.path.join(folder, str(run_num) + '_params.yaml')
params = yaml.load(open(fn).read())
- conn_ids = set()
+ conn_ids_set = set()
+ rr = r"{0}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.\d+\.log$".format(run_num)
for fname in os.listdir(folder):
- rr = r"{0}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.\d+\.log$".format(run_num)
rm = re.match(rr, fname)
if rm is None:
continue
@@ -77,16 +83,29 @@
if ftype not in ('iops', 'bw', 'lat'):
continue
- try:
- ts = load_fio_log_file(os.path.join(folder, fname))
- if ftype in res:
- assert conn_id not in res[ftype]
+ ts = load_fio_log_file(os.path.join(folder, fname))
+ res.setdefault(ftype, {}).setdefault(conn_id, []).append(ts)
- res.setdefault(ftype, {})[conn_id] = ts
- except AssertionError:
- pass
+ conn_ids_set.add(conn_id)
- conn_ids.add(conn_id)
+ mm_res = {}
+
+ for key, data in res.items():
+ conn_ids = sorted(conn_ids_set)
+ matr = [data[conn_id] for conn_id in conn_ids]
+
+ mm_res[key] = MeasurementMatrix(matr, conn_ids)
+
+ iops_from_lat_matr = []
+ for node_ts in mm_res['lat'].data:
+ iops_from_lat_matr.append([])
+ for thread_ts in node_ts:
+ ndt = [(start + ln, 1000000. / val)
+ for (start, ln, val) in thread_ts.data]
+ new_ts = TimeSeriesValue(ndt)
+ iops_from_lat_matr[-1].append(new_ts)
+
+ mm_res['iops_from_lat'] = MeasurementMatrix(iops_from_lat_matr, conn_ids)
raw_res = {}
for conn_id in conn_ids:
@@ -96,7 +115,11 @@
fc = "{" + open(fn).read().split('{', 1)[1]
raw_res[conn_id] = json.loads(fc)
- return cls(params, res, raw_res)
+ fio_task = FioJobSection(params['name'])
+ fio_task.vals.update(params['vals'])
+
+ config = TestConfig('io', params, None, params['nodes'], folder, None)
+ return cls(config, fio_task, mm_res, raw_res, params['intervals'])
class Attrmapper(object):
@@ -157,25 +180,12 @@
return perc_50 / 1000., perc_95 / 1000.
-def prepare(ramp_time, data, avg_interval):
- if data is None:
- return data
-
- res = {}
- for key, ts_data in data.items():
- if ramp_time > 0:
- ts_data = ts_data.skip(ramp_time)
-
- res[key] = ts_data.derived(avg_interval)
- return res
-
-
class IOTestResult(TestResults):
"""
Fio run results
config: TestConfig
fio_task: FioJobSection
- ts_results: {str: TimeSeriesValue}
+ ts_results: {str: MeasurementMatrix[TimeSeriesValue]}
raw_result: ????
run_interval:(float, float) - test tun time, used for sensors
"""
@@ -184,11 +194,11 @@
self.name = fio_task.name.split("_")[0]
self.fio_task = fio_task
- ramp_time = fio_task.vals.get('ramp_time', 0)
+ self.bw = ts_results.get('bw')
+ self.lat = ts_results.get('lat')
+ self.iops = ts_results.get('iops')
+ self.iops_from_lat = ts_results.get('iops_from_lat')
- self.bw = prepare(ramp_time, ts_results.get('bw'), 1.0)
- self.lat = prepare(ramp_time, ts_results.get('lat'), 1.0)
- self.iops = prepare(ramp_time, ts_results.get('iops'), 1.0)
# self.slat = drop_warmup(res.get('clat', None), self.params)
# self.clat = drop_warmup(res.get('slat', None), self.params)
@@ -198,6 +208,26 @@
self._pinfo = None
TestResults.__init__(self, config, res, raw_result, run_interval)
+ def get_params_from_fio_report(self):
+ nodes = self.bw.connections_ids
+
+ iops = [self.raw_result[node]['jobs'][0]['mixed']['iops'] for node in nodes]
+ total_ios = [self.raw_result[node]['jobs'][0]['mixed']['total_ios'] for node in nodes]
+ runtime = [self.raw_result[node]['jobs'][0]['mixed']['runtime'] / 1000 for node in nodes]
+ flt_iops = [float(ios) / rtime for ios, rtime in zip(total_ios, runtime)]
+
+ bw = [self.raw_result[node]['jobs'][0]['mixed']['bw'] for node in nodes]
+ total_bytes = [self.raw_result[node]['jobs'][0]['mixed']['io_bytes'] for node in nodes]
+ flt_bw = [float(tbytes) / rtime for tbytes, rtime in zip(total_bytes, runtime)]
+
+ lat = [self.raw_result[node]['jobs'][0]['mixed']['lat'] for node in nodes]
+
+ return {'iops': iops,
+ 'flt_iops': flt_iops,
+ 'bw': bw,
+ 'flt_bw': flt_bw,
+ 'lat': lat}
+
def summary(self):
return get_test_summary(self.fio_task) + "vm" \
+ str(len(self.config.nodes))
@@ -205,8 +235,8 @@
def get_yamable(self):
return self.summary()
- @property
- def disk_perf_info(self):
+ def disk_perf_info(self, avg_interval=5.0):
+
if self._pinfo is not None:
return self._pinfo
@@ -228,22 +258,82 @@
for k, v in lat_mks.items():
lat_mks[k] = float(v) / num_res
- testnodes_count = len(self.fio_raw_res)
+ testnodes_count = len(self.config.nodes)
pinfo = DiskPerfInfo(self.name,
self.summary(),
self.params,
testnodes_count)
- pinfo.raw_bw = [res.vals() for res in self.bw.values()]
- pinfo.raw_iops = [res.vals() for res in self.iops.values()]
- pinfo.raw_lat = [res.vals() for res in self.lat.values()]
+ # ramp_time = self.fio_task.vals.get('ramp_time', 0)
- pinfo.bw = data_property(map(sum, zip(*pinfo.raw_bw)))
- pinfo.iops = data_property(map(sum, zip(*pinfo.raw_iops)))
- pinfo.lat = data_property(sum(pinfo.raw_lat, []))
+ def prepare(data):
+ if data is None:
+ return data
+
+ res = []
+ for ts_data in data:
+ # if ramp_time > 0:
+ # ts_data = ts_data.skip(ramp_time)
+
+ if ts_data.average_interval() < avg_interval:
+ ts_data = ts_data.derived(avg_interval)
+
+ res.append(ts_data.values)
+ return res
+
+ def agg_data(matr):
+ arr = sum(matr, [])
+ min_len = min(map(len, arr))
+ res = []
+ for idx in range(min_len):
+ res.append(sum(dt[idx] for dt in arr))
+ return res
+
+ pinfo.raw_lat = map(prepare, self.lat.per_vm())
+ num_th = sum(map(len, pinfo.raw_lat))
+ avg_lat = [val / num_th for val in agg_data(pinfo.raw_lat)]
+ pinfo.lat = data_property(avg_lat)
pinfo.lat_50, pinfo.lat_95 = get_lat_perc_50_95(lat_mks)
+ pinfo.raw_bw = map(prepare, self.bw.per_vm())
+ pinfo.raw_iops = map(prepare, self.iops.per_vm())
+
+ iops_per_th = sum(sum(pinfo.raw_iops, []), [])
+
+ fparams = self.get_params_from_fio_report()
+ fio_report_bw = sum(fparams['flt_bw'])
+ fio_report_iops = sum(fparams['flt_iops'])
+
+ agg_bw = agg_data(pinfo.raw_bw)
+ agg_iops = agg_data(pinfo.raw_iops)
+
+ log_bw_avg = average(agg_bw)
+ log_iops_avg = average(agg_iops)
+
+ # update values to match average from fio report
+ coef_iops = fio_report_iops / float(log_iops_avg)
+ coef_bw = fio_report_bw / float(log_bw_avg)
+
+ bw_log = data_property([val * coef_bw for val in agg_bw])
+ iops_log = data_property([val * coef_iops for val in agg_iops])
+
+ bw_report = data_property([fio_report_bw])
+ iops_report = data_property([fio_report_iops])
+
+ # When IOPS/BW per thread is too low
+ # data from logs is rounded to match
+ if average(iops_per_th) > 10:
+ pinfo.bw = bw_log
+ pinfo.iops = iops_log
+ pinfo.bw2 = bw_report
+ pinfo.iops2 = iops_report
+ else:
+ pinfo.bw = bw_report
+ pinfo.iops = iops_report
+ pinfo.bw2 = bw_log
+ pinfo.iops2 = iops_log
+
self._pinfo = pinfo
return pinfo
@@ -280,6 +370,9 @@
self.err_out_file = self.join_remote("fio_err_out")
self.exit_code_file = self.join_remote("exit_code")
+ self.max_latency = get("max_lat", None)
+ self.min_bw_per_thread = get("max_bw", None)
+
self.use_sudo = get("use_sudo", True)
self.test_logging = get("test_logging", False)
@@ -365,7 +458,6 @@
raise OSError("Can't install - " + str(err))
def pre_run(self):
- prefill = False
prefill = self.config.options.get('prefill_files', True)
if prefill:
@@ -433,9 +525,21 @@
barrier = Barrier(len(self.config.nodes))
results = []
+ # set of OperationModeBlockSize str
+ # which should not ne tested anymore, as
+ # they already too slow with previous thread count
+ lat_bw_limit_reached = set()
+
with ThreadPoolExecutor(len(self.config.nodes)) as pool:
+ self.fio_configs.sort(key=lambda x: int(x.vals['numjobs']))
+
for pos, fio_cfg in enumerate(self.fio_configs):
- logger.info("Will run {0} test".format(fio_cfg.name))
+ test_descr = get_test_summary(fio_cfg.vals).split("th")[0]
+ if test_descr in lat_bw_limit_reached:
+ logger.info("Will skip {0} test due to lat/bw limits".format(fio_cfg.name))
+ continue
+ else:
+ logger.info("Will run {0} test".format(fio_cfg.name))
templ = "Test should takes about {0}." + \
" Should finish at {1}," + \
@@ -484,8 +588,18 @@
with open(os.path.join(self.config.log_directory, fname), "w") as fd:
fd.write(dumps(params))
- res = load_test_results(self.config.log_directory, pos)
+ res = load_test_results(IOTestResult, self.config.log_directory, pos)
results.append(res)
+
+ test_res = res.get_params_from_fio_report()
+ if self.max_latency is not None:
+ if self.max_latency < average(test_res['lat']):
+ lat_bw_limit_reached.add(test_descr)
+
+ if self.min_bw_per_thread is not None:
+ if self.min_bw_per_thread > average(test_res['bw']):
+ lat_bw_limit_reached.add(test_descr)
+
return results
def do_run(self, node, barrier, fio_cfg, pos, nolog=False):
@@ -630,7 +744,7 @@
return begin, end
@classmethod
- def format_for_console(cls, data, dinfo):
+ def format_for_console(cls, results):
"""
create a table with io performance report
for console
@@ -650,32 +764,29 @@
return (data.name.rsplit("_", 1)[0],
p['rw'],
- get_test_sync_mode(data.params),
+ get_test_sync_mode(data.params['vals']),
ssize2b(p['blocksize']),
- int(th_count) * data.testnodes_count)
+ int(th_count) * len(data.config.nodes))
tab = texttable.Texttable(max_width=120)
tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
- tab.set_cols_align(["l", "l", "r", "r", "r", "r", "r", "r", "r"])
- items = sorted(dinfo.values(), key=key_func)
-
- prev_k = None
header = ["Name", "Description", "iops\ncum", "KiBps\ncum",
"Cnf\n95%", "Dev%", "iops\nper vm", "KiBps\nper vm", "lat\nms"]
+ tab.set_cols_align(["l", "l"] + ['r'] * (len(header) - 2))
+ sep = ["-------", "-----------"] + ["---"] * (len(header) - 2)
+ tab.header(header)
- for data in items:
- curr_k = key_func(data)[:4]
-
+ prev_k = None
+ for item in sorted(results, key=key_func):
+ curr_k = key_func(item)[:4]
if prev_k is not None:
if prev_k != curr_k:
- tab.add_row(
- ["-------", "-----------", "-----", "------",
- "---", "----", "------", "---", "-----"])
+ tab.add_row(sep)
prev_k = curr_k
- test_dinfo = dinfo[(data.name, data.summary)]
+ test_dinfo = item.disk_perf_info()
iops, _ = test_dinfo.iops.rounded_average_conf()
@@ -687,18 +798,23 @@
lat, _ = test_dinfo.lat.rounded_average_conf()
lat = round_3_digit(int(lat) // 1000)
- iops_per_vm = round_3_digit(iops / data.testnodes_count)
- bw_per_vm = round_3_digit(bw / data.testnodes_count)
+ testnodes_count = len(item.config.nodes)
+ iops_per_vm = round_3_digit(iops / testnodes_count)
+ bw_per_vm = round_3_digit(bw / testnodes_count)
iops = round_3_digit(iops)
+ # iops_from_lat = round_3_digit(iops_from_lat)
bw = round_3_digit(bw)
- params = (data.name.rsplit('_', 1)[0],
- data.summary, int(iops), int(bw), str(conf_perc),
+ params = (item.name.rsplit('_', 1)[0],
+ item.summary(),
+ int(iops),
+ int(bw),
+ str(conf_perc),
str(dev_perc),
- int(iops_per_vm), int(bw_per_vm), lat)
+ int(iops_per_vm),
+ int(bw_per_vm),
+ lat)
tab.add_row(params)
- tab.header(header)
-
return tab.draw()
diff --git a/wally/suits/io/fio_task_parser.py b/wally/suits/io/fio_task_parser.py
index e8ec6f9..c08bd37 100644
--- a/wally/suits/io/fio_task_parser.py
+++ b/wally/suits/io/fio_task_parser.py
@@ -284,6 +284,10 @@
sec.vals['unified_rw_reporting'] = '1'
+ if isinstance(sec.vals['size'], Var):
+ raise ValueError("Variable {0} isn't provided".format(
+ sec.vals['size'].name))
+
sz = ssize2b(sec.vals['size'])
offset = sz * ((MAGIC_OFFSET * counter[0]) % 1.0)
offset = int(offset) // 1024 ** 2
@@ -294,6 +298,10 @@
if val.name in new_vars:
sec.vals[name] = new_vars[val.name]
+ for vl in sec.vals.values():
+ if isinstance(vl, Var):
+ raise ValueError("Variable {0} isn't provided".format(vl.name))
+
params = sec.vals.copy()
params['UNIQ'] = 'UN{0}'.format(counter[0])
params['COUNTER'] = str(counter[0])
@@ -404,20 +412,16 @@
'runcycle': argv_obj.runcycle,
}
- sliced_it = fio_cfg_compile(job_cfg, argv_obj.jobfile,
- params, **slice_params)
+ sec_it = fio_cfg_compile(job_cfg, argv_obj.jobfile,
+ params, **slice_params)
if argv_obj.action == 'estimate':
- sum_time = 0
- for cfg_slice in sliced_it:
- sum_time += sum(map(execution_time, cfg_slice))
- print sec_to_str(sum_time)
+ print sec_to_str(sum(map(execution_time, sec_it)))
elif argv_obj.action == 'num_tests':
- print sum(map(len, map(list, sliced_it)))
+ print sum(map(len, map(list, sec_it)))
elif argv_obj.action == 'compile':
splitter = "\n#" + "-" * 70 + "\n\n"
- for cfg_slice in sliced_it:
- print splitter.join(map(str, cfg_slice))
+ print splitter.join(map(str, sec_it))
return 0
diff --git a/wally/suits/io/formatter.py b/wally/suits/io/formatter.py
deleted file mode 100644
index 59691b2..0000000
--- a/wally/suits/io/formatter.py
+++ /dev/null
@@ -1,80 +0,0 @@
-import texttable
-
-from wally.utils import ssize2b
-from wally.statistic import round_3_digit
-from .fio_task_parser import get_test_sync_mode
-
-
-def getconc(data):
- th_count = data.params.vals.get('numjobs')
-
- if th_count is None:
- th_count = data.params.vals.get('concurence', 1)
- return th_count
-
-
-def key_func(data):
- p = data.params.vals
-
- th_count = getconc(data)
-
- return (data.name.rsplit("_", 1)[0],
- p['rw'],
- get_test_sync_mode(data.params),
- ssize2b(p['blocksize']),
- int(th_count) * data.testnodes_count)
-
-
-def format_results_for_console(dinfo):
- """
- create a table with io performance report
- for console
- """
- tab = texttable.Texttable(max_width=120)
- tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
- tab.set_cols_align(["l", "l", "r", "r", "r", "r", "r", "r", "r"])
-
- items = sorted(dinfo.values(), key=key_func)
-
- prev_k = None
- header = ["Name", "Description", "iops\ncum", "KiBps\ncum",
- "Cnf\n95%", "Dev%", "iops\nper vm", "KiBps\nper vm", "lat\nms"]
-
- for data in items:
- curr_k = key_func(data)[:4]
-
- if prev_k is not None:
- if prev_k != curr_k:
- tab.add_row(
- ["-------", "-----------", "-----", "------",
- "---", "----", "------", "---", "-----"])
-
- prev_k = curr_k
-
- test_dinfo = dinfo[(data.name, data.summary)]
-
- iops, _ = test_dinfo.iops.rounded_average_conf()
-
- bw, bw_conf = test_dinfo.bw.rounded_average_conf()
- _, bw_dev = test_dinfo.bw.rounded_average_dev()
- conf_perc = int(round(bw_conf * 100 / bw))
- dev_perc = int(round(bw_dev * 100 / bw))
-
- lat, _ = test_dinfo.lat.rounded_average_conf()
- lat = round_3_digit(int(lat) // 1000)
-
- iops_per_vm = round_3_digit(iops / data.testnodes_count)
- bw_per_vm = round_3_digit(bw / data.testnodes_count)
-
- iops = round_3_digit(iops)
- bw = round_3_digit(bw)
-
- params = (data.name.rsplit('_', 1)[0],
- data.summary, int(iops), int(bw), str(conf_perc),
- str(dev_perc),
- int(iops_per_vm), int(bw_per_vm), lat)
- tab.add_row(params)
-
- tab.header(header)
-
- return tab.draw()
diff --git a/wally/suits/io/hdd.cfg b/wally/suits/io/hdd.cfg
index 6d3107a..ede6de2 100644
--- a/wally/suits/io/hdd.cfg
+++ b/wally/suits/io/hdd.cfg
@@ -6,7 +6,7 @@
NUMJOBS={% 1, 3, 5, 10, 20, 40 %}
ramp_time=5
-runtime=30
+runtime=90
direct=1
# ---------------------------------------------------------------------
diff --git a/wally/suits/io/lat_vs_iops.cfg b/wally/suits/io/lat_vs_iops.cfg
index 16e73e2..8dfa6d2 100644
--- a/wally/suits/io/lat_vs_iops.cfg
+++ b/wally/suits/io/lat_vs_iops.cfg
@@ -12,27 +12,27 @@
# ---------------------------------------------------------------------
# latency as function from IOPS
# ---------------------------------------------------------------------
-[lat_vs_iops{rate_iops}_{TEST_SUMM}]
+[latVSiops{rate_iops}_{TEST_SUMM}]
numjobs=1
rate_iops={% 20, 40, 60, 80, 100, 120, 160, 200, 250, 300 %}
# ---------------------------------------------------------------------
# latency as function from IOPS
# ---------------------------------------------------------------------
-[lat_vs_iops{rate_iops}_{TEST_SUMM}]
+[latVSiops{rate_iops}_{TEST_SUMM}]
numjobs=3
rate_iops={% 10, 20, 40, 60, 80, 100, 120, 160 %}
# ---------------------------------------------------------------------
# latency as function from IOPS
# ---------------------------------------------------------------------
-[lat_vs_iops{rate_iops}_{TEST_SUMM}]
+[latVSiops{rate_iops}_{TEST_SUMM}]
numjobs=7
rate_iops={% 5, 10, 20, 40, 50, 60, 70 %}
# ---------------------------------------------------------------------
# latency as function from IOPS
# ---------------------------------------------------------------------
-[lat_vs_iops{rate_iops}_{TEST_SUMM}]
+[latVSiops{rate_iops}_{TEST_SUMM}]
numjobs=10
rate_iops={% 5, 10, 20, 40, 50 %}
diff --git a/wally/suits/io/rrd.cfg b/wally/suits/io/rrd.cfg
index 42ce09f..4cf9ca9 100644
--- a/wally/suits/io/rrd.cfg
+++ b/wally/suits/io/rrd.cfg
@@ -2,12 +2,39 @@
include defaults.cfg
size={TEST_FILE_SIZE}
-ramp_time=0
-runtime=5
# ---------------------------------------------------------------------
[rws_{TEST_SUMM}]
blocksize=4k
rw=randwrite
sync=1
+ramp_time=0
+runtime=60
+numjobs=50
+# ---------------------------------------------------------------------
+# [rws_{TEST_SUMM}]
+# blocksize=4k
+# rw=randwrite
+# sync=1
+# ramp_time=0
+# runtime=60
+# numjobs=10
+
+# ---------------------------------------------------------------------
+# [rws_{TEST_SUMM}]
+# blocksize=4k
+# rw=randwrite
+# sync=1
+# ramp_time=10
+# runtime=60
+# numjobs=10
+
+# ---------------------------------------------------------------------
+# [rws_{TEST_SUMM}]
+# blocksize=4k
+# rw=randwrite
+# sync=1
+# ramp_time=10
+# runtime=60
+# numjobs=1
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 7564722..af72084 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -80,8 +80,9 @@
"""
data:[[MeasurementResult]] - VM_COUNT x TH_COUNT matrix of MeasurementResult
"""
- def __init__(self, data):
+ def __init__(self, data, connections_ids):
self.data = data
+ self.connections_ids = connections_ids
def per_vm(self):
return self.data
@@ -108,51 +109,59 @@
class TimeSeriesValue(MeasurementResults):
"""
- values:[(float, float, float)] - list of (start_time, lenght, average_value_for_interval)
+ data:[(float, float, float)] - list of (start_time, lenght, average_value_for_interval)
+ odata: original values
"""
def __init__(self, data):
assert len(data) > 0
- data = [(0, 0)] + data
+ self.odata = data[:]
+ self.data = []
- self.values = []
- for (cstart, cval), (nstart, nval) in zip(data[:-1], data[1:]):
- self.values.append((cstart, nstart - cstart, nval))
+ cstart = 0
+ for nstart, nval in data:
+ self.data.append((cstart, nstart - cstart, nval))
+ cstart = nstart
@property
def values(self):
return [val[2] for val in self.data]
+ def average_interval(self):
+ return float(sum([val[1] for val in self.data])) / len(self.data)
+
def skip(self, seconds):
nres = []
- for start, ln, val in enumerate(self.data):
- if start + ln < seconds:
- continue
- elif start > seconds:
- nres.append([start + ln - seconds, val])
- else:
- nres.append([0, val])
+ for start, ln, val in self.data:
+ nstart = start + ln - seconds
+ if nstart > 0:
+ nres.append([nstart, val])
return self.__class__(nres)
def derived(self, tdelta):
- end = tdelta
- res = [[end, 0.0]]
+ end = self.data[-1][0] + self.data[-1][1]
tdelta = float(tdelta)
+ ln = end / tdelta
+
+ if ln - int(ln) > 0:
+ ln += 1
+
+ res = [[tdelta * i, 0.0] for i in range(int(ln))]
+
for start, lenght, val in self.data:
- if start < end:
- ln = min(end, start + lenght) - start
- res[-1][1] += val * ln / tdelta
+ start_idx = int(start / tdelta)
+ end_idx = int((start + lenght) / tdelta)
- if end <= start + lenght:
- end += tdelta
- res.append([end, 0.0])
- while end < start + lenght:
- res[-1][1] = val
- res.append([end, 0.0])
- end += tdelta
+ for idx in range(start_idx, end_idx + 1):
+ rstart = tdelta * idx
+ rend = tdelta * (idx + 1)
- if res[-1][1] < 1:
- res = res[:-1]
+ intersection_ln = min(rend, start + lenght) - max(start, rstart)
+ if intersection_ln > 0:
+ try:
+ res[idx][1] += val * intersection_ln / tdelta
+ except IndexError:
+ raise
return self.__class__(res)