v2 is comming
diff --git a/wally/suits/io/ceph.cfg b/wally/suits/io/ceph.cfg
index 4a651e6..fc96a52 100644
--- a/wally/suits/io/ceph.cfg
+++ b/wally/suits/io/ceph.cfg
@@ -5,7 +5,7 @@
NUMJOBS_SHORT={% 1, 2, 3, 10 %}
ramp_time=15
-runtime=60
+runtime=120
# ---------------------------------------------------------------------
# check different thread count, sync mode. (latency, iops) = func(th_count)
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
index c650069..2a76fc2 100644
--- a/wally/suits/io/fio.py
+++ b/wally/suits/io/fio.py
@@ -15,13 +15,14 @@
from concurrent.futures import ThreadPoolExecutor
from wally.pretty_yaml import dumps
-from wally.statistic import round_3_digit, data_property
+from wally.statistic import round_3_digit, data_property, average
from wally.utils import ssize2b, sec_to_str, StopTestError, Barrier, get_os
from wally.ssh_utils import (save_to_remote, read_from_remote, BGSSHTask, reconnect)
from .fio_task_parser import (execution_time, fio_cfg_compile,
- get_test_summary, get_test_sync_mode)
-from ..itest import TimeSeriesValue, PerfTest, TestResults, run_on_node
+ get_test_summary, get_test_sync_mode, FioJobSection)
+from ..itest import (TimeSeriesValue, PerfTest, TestResults,
+ run_on_node, TestConfig, MeasurementMatrix)
logger = logging.getLogger("wally")
@@ -52,7 +53,12 @@
def load_fio_log_file(fname):
with open(fname) as fd:
it = [ln.split(',')[:2] for ln in fd]
- vals = [(float(off) / 1000, float(val.strip())) for off, val in it]
+
+ vals = [(float(off) / 1000, # convert us to ms
+ float(val.strip()) + 0.5) # add 0.5 to compemsate average value
+ # as fio trimm all values in log to integer
+ for off, val in it]
+
return TimeSeriesValue(vals)
@@ -63,9 +69,9 @@
fn = os.path.join(folder, str(run_num) + '_params.yaml')
params = yaml.load(open(fn).read())
- conn_ids = set()
+ conn_ids_set = set()
+ rr = r"{0}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.\d+\.log$".format(run_num)
for fname in os.listdir(folder):
- rr = r"{0}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.\d+\.log$".format(run_num)
rm = re.match(rr, fname)
if rm is None:
continue
@@ -77,16 +83,29 @@
if ftype not in ('iops', 'bw', 'lat'):
continue
- try:
- ts = load_fio_log_file(os.path.join(folder, fname))
- if ftype in res:
- assert conn_id not in res[ftype]
+ ts = load_fio_log_file(os.path.join(folder, fname))
+ res.setdefault(ftype, {}).setdefault(conn_id, []).append(ts)
- res.setdefault(ftype, {})[conn_id] = ts
- except AssertionError:
- pass
+ conn_ids_set.add(conn_id)
- conn_ids.add(conn_id)
+ mm_res = {}
+
+ for key, data in res.items():
+ conn_ids = sorted(conn_ids_set)
+ matr = [data[conn_id] for conn_id in conn_ids]
+
+ mm_res[key] = MeasurementMatrix(matr, conn_ids)
+
+ iops_from_lat_matr = []
+ for node_ts in mm_res['lat'].data:
+ iops_from_lat_matr.append([])
+ for thread_ts in node_ts:
+ ndt = [(start + ln, 1000000. / val)
+ for (start, ln, val) in thread_ts.data]
+ new_ts = TimeSeriesValue(ndt)
+ iops_from_lat_matr[-1].append(new_ts)
+
+ mm_res['iops_from_lat'] = MeasurementMatrix(iops_from_lat_matr, conn_ids)
raw_res = {}
for conn_id in conn_ids:
@@ -96,7 +115,11 @@
fc = "{" + open(fn).read().split('{', 1)[1]
raw_res[conn_id] = json.loads(fc)
- return cls(params, res, raw_res)
+ fio_task = FioJobSection(params['name'])
+ fio_task.vals.update(params['vals'])
+
+ config = TestConfig('io', params, None, params['nodes'], folder, None)
+ return cls(config, fio_task, mm_res, raw_res, params['intervals'])
class Attrmapper(object):
@@ -157,25 +180,12 @@
return perc_50 / 1000., perc_95 / 1000.
-def prepare(ramp_time, data, avg_interval):
- if data is None:
- return data
-
- res = {}
- for key, ts_data in data.items():
- if ramp_time > 0:
- ts_data = ts_data.skip(ramp_time)
-
- res[key] = ts_data.derived(avg_interval)
- return res
-
-
class IOTestResult(TestResults):
"""
Fio run results
config: TestConfig
fio_task: FioJobSection
- ts_results: {str: TimeSeriesValue}
+ ts_results: {str: MeasurementMatrix[TimeSeriesValue]}
raw_result: ????
run_interval:(float, float) - test tun time, used for sensors
"""
@@ -184,11 +194,11 @@
self.name = fio_task.name.split("_")[0]
self.fio_task = fio_task
- ramp_time = fio_task.vals.get('ramp_time', 0)
+ self.bw = ts_results.get('bw')
+ self.lat = ts_results.get('lat')
+ self.iops = ts_results.get('iops')
+ self.iops_from_lat = ts_results.get('iops_from_lat')
- self.bw = prepare(ramp_time, ts_results.get('bw'), 1.0)
- self.lat = prepare(ramp_time, ts_results.get('lat'), 1.0)
- self.iops = prepare(ramp_time, ts_results.get('iops'), 1.0)
# self.slat = drop_warmup(res.get('clat', None), self.params)
# self.clat = drop_warmup(res.get('slat', None), self.params)
@@ -198,6 +208,26 @@
self._pinfo = None
TestResults.__init__(self, config, res, raw_result, run_interval)
+ def get_params_from_fio_report(self):
+ nodes = self.bw.connections_ids
+
+ iops = [self.raw_result[node]['jobs'][0]['mixed']['iops'] for node in nodes]
+ total_ios = [self.raw_result[node]['jobs'][0]['mixed']['total_ios'] for node in nodes]
+ runtime = [self.raw_result[node]['jobs'][0]['mixed']['runtime'] / 1000 for node in nodes]
+ flt_iops = [float(ios) / rtime for ios, rtime in zip(total_ios, runtime)]
+
+ bw = [self.raw_result[node]['jobs'][0]['mixed']['bw'] for node in nodes]
+ total_bytes = [self.raw_result[node]['jobs'][0]['mixed']['io_bytes'] for node in nodes]
+ flt_bw = [float(tbytes) / rtime for tbytes, rtime in zip(total_bytes, runtime)]
+
+ lat = [self.raw_result[node]['jobs'][0]['mixed']['lat'] for node in nodes]
+
+ return {'iops': iops,
+ 'flt_iops': flt_iops,
+ 'bw': bw,
+ 'flt_bw': flt_bw,
+ 'lat': lat}
+
def summary(self):
return get_test_summary(self.fio_task) + "vm" \
+ str(len(self.config.nodes))
@@ -205,8 +235,8 @@
def get_yamable(self):
return self.summary()
- @property
- def disk_perf_info(self):
+ def disk_perf_info(self, avg_interval=5.0):
+
if self._pinfo is not None:
return self._pinfo
@@ -228,22 +258,82 @@
for k, v in lat_mks.items():
lat_mks[k] = float(v) / num_res
- testnodes_count = len(self.fio_raw_res)
+ testnodes_count = len(self.config.nodes)
pinfo = DiskPerfInfo(self.name,
self.summary(),
self.params,
testnodes_count)
- pinfo.raw_bw = [res.vals() for res in self.bw.values()]
- pinfo.raw_iops = [res.vals() for res in self.iops.values()]
- pinfo.raw_lat = [res.vals() for res in self.lat.values()]
+ # ramp_time = self.fio_task.vals.get('ramp_time', 0)
- pinfo.bw = data_property(map(sum, zip(*pinfo.raw_bw)))
- pinfo.iops = data_property(map(sum, zip(*pinfo.raw_iops)))
- pinfo.lat = data_property(sum(pinfo.raw_lat, []))
+ def prepare(data):
+ if data is None:
+ return data
+
+ res = []
+ for ts_data in data:
+ # if ramp_time > 0:
+ # ts_data = ts_data.skip(ramp_time)
+
+ if ts_data.average_interval() < avg_interval:
+ ts_data = ts_data.derived(avg_interval)
+
+ res.append(ts_data.values)
+ return res
+
+ def agg_data(matr):
+ arr = sum(matr, [])
+ min_len = min(map(len, arr))
+ res = []
+ for idx in range(min_len):
+ res.append(sum(dt[idx] for dt in arr))
+ return res
+
+ pinfo.raw_lat = map(prepare, self.lat.per_vm())
+ num_th = sum(map(len, pinfo.raw_lat))
+ avg_lat = [val / num_th for val in agg_data(pinfo.raw_lat)]
+ pinfo.lat = data_property(avg_lat)
pinfo.lat_50, pinfo.lat_95 = get_lat_perc_50_95(lat_mks)
+ pinfo.raw_bw = map(prepare, self.bw.per_vm())
+ pinfo.raw_iops = map(prepare, self.iops.per_vm())
+
+ iops_per_th = sum(sum(pinfo.raw_iops, []), [])
+
+ fparams = self.get_params_from_fio_report()
+ fio_report_bw = sum(fparams['flt_bw'])
+ fio_report_iops = sum(fparams['flt_iops'])
+
+ agg_bw = agg_data(pinfo.raw_bw)
+ agg_iops = agg_data(pinfo.raw_iops)
+
+ log_bw_avg = average(agg_bw)
+ log_iops_avg = average(agg_iops)
+
+ # update values to match average from fio report
+ coef_iops = fio_report_iops / float(log_iops_avg)
+ coef_bw = fio_report_bw / float(log_bw_avg)
+
+ bw_log = data_property([val * coef_bw for val in agg_bw])
+ iops_log = data_property([val * coef_iops for val in agg_iops])
+
+ bw_report = data_property([fio_report_bw])
+ iops_report = data_property([fio_report_iops])
+
+ # When IOPS/BW per thread is too low
+ # data from logs is rounded to match
+ if average(iops_per_th) > 10:
+ pinfo.bw = bw_log
+ pinfo.iops = iops_log
+ pinfo.bw2 = bw_report
+ pinfo.iops2 = iops_report
+ else:
+ pinfo.bw = bw_report
+ pinfo.iops = iops_report
+ pinfo.bw2 = bw_log
+ pinfo.iops2 = iops_log
+
self._pinfo = pinfo
return pinfo
@@ -280,6 +370,9 @@
self.err_out_file = self.join_remote("fio_err_out")
self.exit_code_file = self.join_remote("exit_code")
+ self.max_latency = get("max_lat", None)
+ self.min_bw_per_thread = get("max_bw", None)
+
self.use_sudo = get("use_sudo", True)
self.test_logging = get("test_logging", False)
@@ -365,7 +458,6 @@
raise OSError("Can't install - " + str(err))
def pre_run(self):
- prefill = False
prefill = self.config.options.get('prefill_files', True)
if prefill:
@@ -433,9 +525,21 @@
barrier = Barrier(len(self.config.nodes))
results = []
+ # set of OperationModeBlockSize str
+ # which should not ne tested anymore, as
+ # they already too slow with previous thread count
+ lat_bw_limit_reached = set()
+
with ThreadPoolExecutor(len(self.config.nodes)) as pool:
+ self.fio_configs.sort(key=lambda x: int(x.vals['numjobs']))
+
for pos, fio_cfg in enumerate(self.fio_configs):
- logger.info("Will run {0} test".format(fio_cfg.name))
+ test_descr = get_test_summary(fio_cfg.vals).split("th")[0]
+ if test_descr in lat_bw_limit_reached:
+ logger.info("Will skip {0} test due to lat/bw limits".format(fio_cfg.name))
+ continue
+ else:
+ logger.info("Will run {0} test".format(fio_cfg.name))
templ = "Test should takes about {0}." + \
" Should finish at {1}," + \
@@ -484,8 +588,18 @@
with open(os.path.join(self.config.log_directory, fname), "w") as fd:
fd.write(dumps(params))
- res = load_test_results(self.config.log_directory, pos)
+ res = load_test_results(IOTestResult, self.config.log_directory, pos)
results.append(res)
+
+ test_res = res.get_params_from_fio_report()
+ if self.max_latency is not None:
+ if self.max_latency < average(test_res['lat']):
+ lat_bw_limit_reached.add(test_descr)
+
+ if self.min_bw_per_thread is not None:
+ if self.min_bw_per_thread > average(test_res['bw']):
+ lat_bw_limit_reached.add(test_descr)
+
return results
def do_run(self, node, barrier, fio_cfg, pos, nolog=False):
@@ -630,7 +744,7 @@
return begin, end
@classmethod
- def format_for_console(cls, data, dinfo):
+ def format_for_console(cls, results):
"""
create a table with io performance report
for console
@@ -650,32 +764,29 @@
return (data.name.rsplit("_", 1)[0],
p['rw'],
- get_test_sync_mode(data.params),
+ get_test_sync_mode(data.params['vals']),
ssize2b(p['blocksize']),
- int(th_count) * data.testnodes_count)
+ int(th_count) * len(data.config.nodes))
tab = texttable.Texttable(max_width=120)
tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
- tab.set_cols_align(["l", "l", "r", "r", "r", "r", "r", "r", "r"])
- items = sorted(dinfo.values(), key=key_func)
-
- prev_k = None
header = ["Name", "Description", "iops\ncum", "KiBps\ncum",
"Cnf\n95%", "Dev%", "iops\nper vm", "KiBps\nper vm", "lat\nms"]
+ tab.set_cols_align(["l", "l"] + ['r'] * (len(header) - 2))
+ sep = ["-------", "-----------"] + ["---"] * (len(header) - 2)
+ tab.header(header)
- for data in items:
- curr_k = key_func(data)[:4]
-
+ prev_k = None
+ for item in sorted(results, key=key_func):
+ curr_k = key_func(item)[:4]
if prev_k is not None:
if prev_k != curr_k:
- tab.add_row(
- ["-------", "-----------", "-----", "------",
- "---", "----", "------", "---", "-----"])
+ tab.add_row(sep)
prev_k = curr_k
- test_dinfo = dinfo[(data.name, data.summary)]
+ test_dinfo = item.disk_perf_info()
iops, _ = test_dinfo.iops.rounded_average_conf()
@@ -687,18 +798,23 @@
lat, _ = test_dinfo.lat.rounded_average_conf()
lat = round_3_digit(int(lat) // 1000)
- iops_per_vm = round_3_digit(iops / data.testnodes_count)
- bw_per_vm = round_3_digit(bw / data.testnodes_count)
+ testnodes_count = len(item.config.nodes)
+ iops_per_vm = round_3_digit(iops / testnodes_count)
+ bw_per_vm = round_3_digit(bw / testnodes_count)
iops = round_3_digit(iops)
+ # iops_from_lat = round_3_digit(iops_from_lat)
bw = round_3_digit(bw)
- params = (data.name.rsplit('_', 1)[0],
- data.summary, int(iops), int(bw), str(conf_perc),
+ params = (item.name.rsplit('_', 1)[0],
+ item.summary(),
+ int(iops),
+ int(bw),
+ str(conf_perc),
str(dev_perc),
- int(iops_per_vm), int(bw_per_vm), lat)
+ int(iops_per_vm),
+ int(bw_per_vm),
+ lat)
tab.add_row(params)
- tab.header(header)
-
return tab.draw()
diff --git a/wally/suits/io/fio_task_parser.py b/wally/suits/io/fio_task_parser.py
index e8ec6f9..c08bd37 100644
--- a/wally/suits/io/fio_task_parser.py
+++ b/wally/suits/io/fio_task_parser.py
@@ -284,6 +284,10 @@
sec.vals['unified_rw_reporting'] = '1'
+ if isinstance(sec.vals['size'], Var):
+ raise ValueError("Variable {0} isn't provided".format(
+ sec.vals['size'].name))
+
sz = ssize2b(sec.vals['size'])
offset = sz * ((MAGIC_OFFSET * counter[0]) % 1.0)
offset = int(offset) // 1024 ** 2
@@ -294,6 +298,10 @@
if val.name in new_vars:
sec.vals[name] = new_vars[val.name]
+ for vl in sec.vals.values():
+ if isinstance(vl, Var):
+ raise ValueError("Variable {0} isn't provided".format(vl.name))
+
params = sec.vals.copy()
params['UNIQ'] = 'UN{0}'.format(counter[0])
params['COUNTER'] = str(counter[0])
@@ -404,20 +412,16 @@
'runcycle': argv_obj.runcycle,
}
- sliced_it = fio_cfg_compile(job_cfg, argv_obj.jobfile,
- params, **slice_params)
+ sec_it = fio_cfg_compile(job_cfg, argv_obj.jobfile,
+ params, **slice_params)
if argv_obj.action == 'estimate':
- sum_time = 0
- for cfg_slice in sliced_it:
- sum_time += sum(map(execution_time, cfg_slice))
- print sec_to_str(sum_time)
+ print sec_to_str(sum(map(execution_time, sec_it)))
elif argv_obj.action == 'num_tests':
- print sum(map(len, map(list, sliced_it)))
+ print sum(map(len, map(list, sec_it)))
elif argv_obj.action == 'compile':
splitter = "\n#" + "-" * 70 + "\n\n"
- for cfg_slice in sliced_it:
- print splitter.join(map(str, cfg_slice))
+ print splitter.join(map(str, sec_it))
return 0
diff --git a/wally/suits/io/formatter.py b/wally/suits/io/formatter.py
deleted file mode 100644
index 59691b2..0000000
--- a/wally/suits/io/formatter.py
+++ /dev/null
@@ -1,80 +0,0 @@
-import texttable
-
-from wally.utils import ssize2b
-from wally.statistic import round_3_digit
-from .fio_task_parser import get_test_sync_mode
-
-
-def getconc(data):
- th_count = data.params.vals.get('numjobs')
-
- if th_count is None:
- th_count = data.params.vals.get('concurence', 1)
- return th_count
-
-
-def key_func(data):
- p = data.params.vals
-
- th_count = getconc(data)
-
- return (data.name.rsplit("_", 1)[0],
- p['rw'],
- get_test_sync_mode(data.params),
- ssize2b(p['blocksize']),
- int(th_count) * data.testnodes_count)
-
-
-def format_results_for_console(dinfo):
- """
- create a table with io performance report
- for console
- """
- tab = texttable.Texttable(max_width=120)
- tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
- tab.set_cols_align(["l", "l", "r", "r", "r", "r", "r", "r", "r"])
-
- items = sorted(dinfo.values(), key=key_func)
-
- prev_k = None
- header = ["Name", "Description", "iops\ncum", "KiBps\ncum",
- "Cnf\n95%", "Dev%", "iops\nper vm", "KiBps\nper vm", "lat\nms"]
-
- for data in items:
- curr_k = key_func(data)[:4]
-
- if prev_k is not None:
- if prev_k != curr_k:
- tab.add_row(
- ["-------", "-----------", "-----", "------",
- "---", "----", "------", "---", "-----"])
-
- prev_k = curr_k
-
- test_dinfo = dinfo[(data.name, data.summary)]
-
- iops, _ = test_dinfo.iops.rounded_average_conf()
-
- bw, bw_conf = test_dinfo.bw.rounded_average_conf()
- _, bw_dev = test_dinfo.bw.rounded_average_dev()
- conf_perc = int(round(bw_conf * 100 / bw))
- dev_perc = int(round(bw_dev * 100 / bw))
-
- lat, _ = test_dinfo.lat.rounded_average_conf()
- lat = round_3_digit(int(lat) // 1000)
-
- iops_per_vm = round_3_digit(iops / data.testnodes_count)
- bw_per_vm = round_3_digit(bw / data.testnodes_count)
-
- iops = round_3_digit(iops)
- bw = round_3_digit(bw)
-
- params = (data.name.rsplit('_', 1)[0],
- data.summary, int(iops), int(bw), str(conf_perc),
- str(dev_perc),
- int(iops_per_vm), int(bw_per_vm), lat)
- tab.add_row(params)
-
- tab.header(header)
-
- return tab.draw()
diff --git a/wally/suits/io/hdd.cfg b/wally/suits/io/hdd.cfg
index 6d3107a..ede6de2 100644
--- a/wally/suits/io/hdd.cfg
+++ b/wally/suits/io/hdd.cfg
@@ -6,7 +6,7 @@
NUMJOBS={% 1, 3, 5, 10, 20, 40 %}
ramp_time=5
-runtime=30
+runtime=90
direct=1
# ---------------------------------------------------------------------
diff --git a/wally/suits/io/lat_vs_iops.cfg b/wally/suits/io/lat_vs_iops.cfg
index 16e73e2..8dfa6d2 100644
--- a/wally/suits/io/lat_vs_iops.cfg
+++ b/wally/suits/io/lat_vs_iops.cfg
@@ -12,27 +12,27 @@
# ---------------------------------------------------------------------
# latency as function from IOPS
# ---------------------------------------------------------------------
-[lat_vs_iops{rate_iops}_{TEST_SUMM}]
+[latVSiops{rate_iops}_{TEST_SUMM}]
numjobs=1
rate_iops={% 20, 40, 60, 80, 100, 120, 160, 200, 250, 300 %}
# ---------------------------------------------------------------------
# latency as function from IOPS
# ---------------------------------------------------------------------
-[lat_vs_iops{rate_iops}_{TEST_SUMM}]
+[latVSiops{rate_iops}_{TEST_SUMM}]
numjobs=3
rate_iops={% 10, 20, 40, 60, 80, 100, 120, 160 %}
# ---------------------------------------------------------------------
# latency as function from IOPS
# ---------------------------------------------------------------------
-[lat_vs_iops{rate_iops}_{TEST_SUMM}]
+[latVSiops{rate_iops}_{TEST_SUMM}]
numjobs=7
rate_iops={% 5, 10, 20, 40, 50, 60, 70 %}
# ---------------------------------------------------------------------
# latency as function from IOPS
# ---------------------------------------------------------------------
-[lat_vs_iops{rate_iops}_{TEST_SUMM}]
+[latVSiops{rate_iops}_{TEST_SUMM}]
numjobs=10
rate_iops={% 5, 10, 20, 40, 50 %}
diff --git a/wally/suits/io/rrd.cfg b/wally/suits/io/rrd.cfg
index 42ce09f..4cf9ca9 100644
--- a/wally/suits/io/rrd.cfg
+++ b/wally/suits/io/rrd.cfg
@@ -2,12 +2,39 @@
include defaults.cfg
size={TEST_FILE_SIZE}
-ramp_time=0
-runtime=5
# ---------------------------------------------------------------------
[rws_{TEST_SUMM}]
blocksize=4k
rw=randwrite
sync=1
+ramp_time=0
+runtime=60
+numjobs=50
+# ---------------------------------------------------------------------
+# [rws_{TEST_SUMM}]
+# blocksize=4k
+# rw=randwrite
+# sync=1
+# ramp_time=0
+# runtime=60
+# numjobs=10
+
+# ---------------------------------------------------------------------
+# [rws_{TEST_SUMM}]
+# blocksize=4k
+# rw=randwrite
+# sync=1
+# ramp_time=10
+# runtime=60
+# numjobs=10
+
+# ---------------------------------------------------------------------
+# [rws_{TEST_SUMM}]
+# blocksize=4k
+# rw=randwrite
+# sync=1
+# ramp_time=10
+# runtime=60
+# numjobs=1