pre-release updates, bug fixes
diff --git a/chart/charts.py b/chart/charts.py
index b53ac5a..f985526 100644
--- a/chart/charts.py
+++ b/chart/charts.py
@@ -7,7 +7,6 @@
from GChartWrapper import constants
from config import cfg_dict
-CHARTS_IMG_PATH = cfg_dict['charts_img_path']
COLORS = ["1569C7", "81D8D0", "307D7E", "5CB3FF", "0040FF", "81DAF5"]
@@ -54,6 +53,7 @@
}
}
"""
+
bar = VerticalBarGroup([], encoding='text')
bar.title(title)
@@ -71,7 +71,8 @@
deviations.extend(zip(*display_dev))
# bar.dataset(values + deviations, series=len(values))
- bar.dataset(values + deviations + [l[0] for l in lines], series=len(values))
+ bar.dataset(values + deviations + [l[0] for l in lines],
+ series=len(values))
bar.axes.type('xyy')
bar.axes.label(2, None, label_x)
if scale_x:
@@ -114,7 +115,7 @@
bar.legend(*legend)
bar.scale(*scale)
img_name = hashlib.md5(str(bar)).hexdigest() + ".png"
- img_path = os.path.join(CHARTS_IMG_PATH, img_name)
+ img_path = os.path.join(cfg_dict['charts_img_path'], img_name)
if not os.path.exists(img_path):
save_image(bar, img_path)
return str(bar)
@@ -136,7 +137,7 @@
line.size(width, height)
img_name = hashlib.md5(str(line)).hexdigest() + ".png"
- img_path = os.path.join(CHARTS_IMG_PATH, img_name)
+ img_path = os.path.join(cfg_dict['charts_img_path'], img_name)
if not os.path.exists(img_path):
save_image(line, img_path)
return str(line)
diff --git a/config.py b/config.py
index 6a162b8..e7830bb 100644
--- a/config.py
+++ b/config.py
@@ -1,45 +1,30 @@
import os
-# import sys
import yaml
-# import argparse
+
+from petname import Generate as pet_generate
-def parse_config(file_name):
- with open(file_name) as f:
- cfg = yaml.load(f.read())
-
- return cfg
+cfg_dict = {}
-# WTX???
-# parser = argparse.ArgumentParser(description="config file name")
-# parser.add_argument("-p", "--path")
-# parser.add_argument("-b", "--basedir")
-# parser.add_argument("-t", "--testpath")
-# parser.add_argument("-d", "--database")
-# parser.add_argument("-c", "--chartpath")
+def load_config(file_name):
+ global cfg_dict
+ first_load = len(cfg_dict) == 0
+ cfg_dict.update(yaml.load(open(file_name).read()))
-# config = parser.parse_args(sys.argv[1:])
-path = "koder.yaml"
+ if first_load:
+ var_dir = cfg_dict.get('internal', {}).get('var_dir_root', '/tmp')
-# if config.path is not None:
-# path = config.path
+ while True:
+ dr = os.path.join(var_dir, pet_generate(2, "_"))
+ if not os.path.exists(dr):
+ break
-cfg_dict = parse_config(path)
-# basedir = cfg_dict['paths']['basedir']
-# TEST_PATH = cfg_dict['paths']['TEST_PATH']
-# SQLALCHEMY_MIGRATE_REPO = cfg_dict['paths']['SQLALCHEMY_MIGRATE_REPO']
-# DATABASE_URI = cfg_dict['paths']['DATABASE_URI']
-# CHARTS_IMG_PATH = cfg_dict['paths']['CHARTS_IMG_PATH']
+ cfg_dict['var_dir'] = dr
+ os.makedirs(cfg_dict['var_dir'])
-# if config.basedir is not None:
-# basedir = config.basedir
+ cfg_dict['charts_img_path'] = os.path.join(cfg_dict['var_dir'], 'charts')
+ os.makedirs(cfg_dict['charts_img_path'])
-# if config.testpath is not None:
-# TEST_PATH = config.testpath
-
-# if config.database is not None:
-# DATABASE_URI = config.database
-
-# if config.chartpath is not None:
-# CHARTS_IMG_PATH = config.chartpath
+ cfg_dict['vm_ids_fname'] = os.path.join(cfg_dict['var_dir'], 'os_vm_ids')
+ cfg_dict['report'] = os.path.join(cfg_dict['var_dir'], 'report.html')
diff --git a/formatters.py b/formatters.py
index 2a47649..bbbf8d2 100644
--- a/formatters.py
+++ b/formatters.py
@@ -1,71 +1,128 @@
import itertools
-import json
-import math
+from collections import defaultdict
+
+import texttable
+
+from statistic import med_dev
+
+# [{u'__meta__': {u'raw_cfg': u'[writetest * 2]\ngroup_reporting\nnumjobs=4\nwait_for_previous\nramp_time=5\nblocksize=4k\nfilename={FILENAME}\nrw=randwrite\ndirect=1\nbuffered=0\niodepth=1\nsize=100Mb\nruntime=10\ntime_based\n\n[readtest * 2]\ngroup_reporting\nnumjobs=4\nwait_for_previous\nramp_time=5\nblocksize=4k\nfilename={FILENAME}\nrw=randread\ndirect=1\nbuffered=0\niodepth=1\nsize=100Mb\nruntime=10\ntime_based\n'},
+# u'res': {u'readtest': {u'action': u'randread',
+# u'blocksize': u'4k',
+# u'bw_mean': [349.61, 276.54],
+# u'clat': [11987.16, 15235.08],
+# u'concurence': 4,
+# u'direct_io': True,
+# u'iops': [316, 251],
+# u'jobname': u'readtest',
+# u'lat': [11987.52, 15235.46],
+# u'slat': [0.0, 0.0],
+# u'sync': False,
+# u'timings': [u'10', u'5']},
+# u'writetest': {u'action': u'randwrite',
+# u'blocksize': u'4k',
+# u'bw_mean': [72.03, 61.84],
+# u'clat': [113525.86, 152836.42],
+# u'concurence': 4,
+# u'direct_io': True,
+# u'iops': [35, 23],
+# u'jobname': u'writetest',
+# u'lat': [113526.31, 152836.89],
+# u'slat': [0.0, 0.0],
+# u'sync': False,
+# u'timings': [u'10', u'5']}}},
+# {u'__meta__': {u'raw_cfg': u'[writetest * 2]\ngroup_reporting\nnumjobs=4\nwait_for_previous\nramp_time=5\nblocksize=4k\nfilename={FILENAME}\nrw=randwrite\ndirect=1\nbuffered=0\niodepth=1\nsize=100Mb\nruntime=10\ntime_based\n\n[readtest * 2]\ngroup_reporting\nnumjobs=4\nwait_for_previous\nramp_time=5\nblocksize=4k\nfilename={FILENAME}\nrw=randread\ndirect=1\nbuffered=0\niodepth=1\nsize=100Mb\nruntime=10\ntime_based\n'},
+# u'res': {u'readtest': {u'action': u'randread',
+# u'blocksize': u'4k',
+# u'bw_mean': [287.62, 280.76],
+# u'clat': [15437.57, 14741.65],
+# u'concurence': 4,
+# u'direct_io': True,
+# u'iops': [258, 271],
+# u'jobname': u'readtest',
+# u'lat': [15437.94, 14742.04],
+# u'slat': [0.0, 0.0],
+# u'sync': False,
+# u'timings': [u'10', u'5']},
+# u'writetest': {u'action': u'randwrite',
+# u'blocksize': u'4k',
+# u'bw_mean': [71.18, 61.62],
+# u'clat': [116382.95, 153486.81],
+# u'concurence': 4,
+# u'direct_io': True,
+# u'iops': [32, 22],
+# u'jobname': u'writetest',
+# u'lat': [116383.44, 153487.27],
+# u'slat': [0.0, 0.0],
+# u'sync': False,
+# u'timings': [u'10', u'5']}}}]
-def get_formatter(test_type):
- if test_type == "iozone" or test_type == "fio":
- return format_io_stat
- elif test_type == "pgbench":
- return format_pgbench_stat
+def get_test_descr(data):
+ rw = {"randread": "rr",
+ "randwrite": "rw",
+ "read": "sr",
+ "write": "sw"}[data["action"]]
+
+ if data["direct_io"]:
+ sync_mode = 'd'
+ elif data["sync"]:
+ sync_mode = 's'
else:
- raise Exception("Cannot get formatter for type %s" % test_type)
+ sync_mode = 'a'
+
+ th_count = int(data['concurence'])
+
+ return "{0}{1}{2}_th{3}".format(rw, sync_mode,
+ data['blocksize'], th_count)
-def format_io_stat(res):
- if len(res) != 0:
- bw_mean = 0.0
- for measurement in res:
- bw_mean += measurement["bw_mean"]
+def format_results_for_console(test_set):
+ data_for_print = []
+ tab = texttable.Texttable()
+ tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER | tab.HLINES)
+ tab.set_cols_align(["l", "r", "r", "r", "r"])
- bw_mean /= len(res)
+ for test_name, data in test_set['res'].items():
+ descr = get_test_descr(data)
- it = ((bw_mean - measurement["bw_mean"]) ** 2 for measurement in res)
- bw_dev = sum(it) ** 0.5
+ iops, _ = med_dev(data['iops'])
+ bw, bwdev = med_dev(data['bw_mean'])
- meta = res[0]['__meta__']
+ # 3 * sigma
+ dev_perc = int((bwdev * 300) / bw)
- sync = meta['sync']
- direct = meta['direct_io']
+ params = (descr, int(iops), int(bw), dev_perc,
+ int(med_dev(data['lat'])[0]) // 1000)
+ data_for_print.append(params)
- if sync and direct:
- ss = "d+"
- elif sync:
- ss = "s"
- elif direct:
- ss = "d"
- else:
- ss = "a"
+ header = ["Description", "IOPS", "BW KBps", "Dev * 3 %", "LAT ms"]
+ tab.add_row(header)
+ tab.header = header
- key = "{0} {1} {2} {3}k".format(meta['action'], ss,
- meta['concurence'],
- meta['blocksize'])
+ map(tab.add_row, data_for_print)
- data = json.dumps({key: (int(bw_mean), int(bw_dev))})
-
- return data
+ return tab.draw()
-def format_pgbench_stat(res):
- """
- Receives results in format:
- "<num_clients> <num_transactions>: <tps>
- <num_clients> <num_transactions>: <tps>
- ....
- "
- """
- if res:
- data = {}
- grouped_res = itertools.groupby(res, lambda x: x[0])
- for key, group in grouped_res:
- results = list(group)
- sum_res = sum([r[1] for r in results])
- mean = sum_res/len(results)
- sum_sq = sum([(r[1] - mean) ** 2 for r in results])
- if len(results) > 1:
- dev = math.sqrt(sum_sq / (len(results) - 1))
- else:
- dev = 0
- data[key] = (mean, dev)
- return data
-
+# def format_pgbench_stat(res):
+# """
+# Receives results in format:
+# "<num_clients> <num_transactions>: <tps>
+# <num_clients> <num_transactions>: <tps>
+# ....
+# "
+# """
+# if res:
+# data = {}
+# grouped_res = itertools.groupby(res, lambda x: x[0])
+# for key, group in grouped_res:
+# results = list(group)
+# sum_res = sum([r[1] for r in results])
+# mean = sum_res/len(results)
+# sum_sq = sum([(r[1] - mean) ** 2 for r in results])
+# if len(results) > 1:
+# dev = (sum_sq / (len(results) - 1))
+# else:
+# dev = 0
+# data[key] = (mean, dev)
+# return data
diff --git a/koder.yaml b/koder.yaml
index 28a8829..ab8e51b 100644
--- a/koder.yaml
+++ b/koder.yaml
@@ -1,6 +1,6 @@
clouds:
fuel:
- id: 3
+ ext_ip: 172.16.53.3
url: http://172.16.52.112:8000/
creds: admin:admin@admin
ssh_creds: root:test37
@@ -17,6 +17,9 @@
# explicit_nodes:
# "ssh://192.168.152.43": testnode
+internal:
+ var_dir_root: /tmp/perf_tests
+
sensors:
receiver_uri: udp://192.168.152.1:5699
roles_mapping:
@@ -28,23 +31,20 @@
openstack:
creds: clouds
vm_params:
- count: x1
+ count: x2
img_name: disk_io_perf
flavor_name: disk_io_perf.256
keypair_name: disk_io_perf
network_zone_name: novanetwork
flt_ip_pool: nova
creds: "ssh://ubuntu@{0}::disk_io_perf.pem"
+ name_templ: disk_io_perf-{0}
- internal_tests:
+ tests:
- io:
cfg: tests/io_task_test.cfg
params:
- SOME_OPT: 12
FILENAME: /tmp/xxx.bin
logging:
extra_logs: 1
-
-charts_img_path: tmp/charts
-output_dest: results.html
diff --git a/report.py b/report.py
index c4b62dd..95851e0 100644
--- a/report.py
+++ b/report.py
@@ -1,12 +1,12 @@
import argparse
from collections import OrderedDict
-import itertools
-import math
-import re
-from chart import charts
+
import formatters
+from chart import charts
+from statistic import med_dev
from utils import ssize_to_b
+from disk_perf_test_tool.tests.io_results_loader import parse_output
OPERATIONS = (('async', ('randwrite asynchronous', 'randread asynchronous',
@@ -167,35 +167,25 @@
render_html(bars + lines, dest)
-def calc_dev(l):
- sum_res = sum(l)
- mean = sum_res/len(l)
- sum_sq = sum([(r - mean) ** 2 for r in l])
- if len(l) > 1:
- return math.sqrt(sum_sq / (len(l) - 1))
- else:
- return 0
-
-
def main():
- from tests.disk_test_agent import parse_output
out = parse_output(
open("results/io_scenario_check_th_count.txt").read()).next()
results = out['res']
charts_url = []
charts_data = {}
+
for test_name, test_res in results.items():
+
blocksize = test_res['blocksize']
op_type = "sync" if test_res['sync'] else "direct"
chart_name = "Block size: %s %s" % (blocksize, op_type)
- lat = sum(test_res['lat']) / len(test_res['lat']) / 1000
- lat_dev = calc_dev(test_res['lat'])
- iops = sum(test_res['iops']) / len(test_res['iops'])
- iops_dev = calc_dev(test_res['iops'])
- bw = sum(test_res['bw_mean']) / len(test_res['bw_mean'])
- bw_dev = calc_dev(test_res['bw_mean'])
+
+ lat, lat_dev = med_dev(test_res['lat'])
+ iops, iops_dev = med_dev(test_res['iops'])
+ bw, bw_dev = med_dev(test_res['bw_mean'])
conc = test_res['concurence']
+
vals = ((lat, lat_dev), (iops, iops_dev), (bw, bw_dev))
charts_data.setdefault(chart_name, {})[conc] = vals
diff --git a/requirements-web.txt b/requirements-web.txt
index 6a56193..55fba09 100644
--- a/requirements-web.txt
+++ b/requirements-web.txt
@@ -20,7 +20,6 @@
oslo.utils==1.3.0
paramiko==1.15.2
pbr==0.10.7
-petname==1.7
prettytable==0.7.2
pyOpenSSL==0.14
python-cinderclient==1.1.1
diff --git a/requirements.txt b/requirements.txt
index 9ed2bfb..4412532 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -4,8 +4,8 @@
iso8601==0.1.10
netaddr==0.7.13
paramiko==1.15.2
-#petname==1.7
-#prettytable==0.7.2
+petname==1.7
+prettytable==0.7.2
pyOpenSSL==0.14
python-cinderclient
python-glanceclient
diff --git a/run_test.py b/run_test.py
index aa47b21..1841bc8 100755
--- a/run_test.py
+++ b/run_test.py
@@ -1,5 +1,4 @@
import os
-import pickle
import sys
import json
import Queue
@@ -10,19 +9,20 @@
import threading
import collections
+import yaml
from concurrent.futures import ThreadPoolExecutor
-import report
-# import formatters
-
import utils
+import report
import ssh_utils
import start_vms
+import pretty_yaml
from nodes import discover
from nodes.node import Node
-from config import cfg_dict, parse_config
+from config import cfg_dict, load_config
from tests.itest import IOPerfTest, PgBenchTest
from sensors.api import start_monitoring
+from formatters import format_results_for_console
logger = logging.getLogger("io-perf-tool")
@@ -133,10 +133,11 @@
th.join()
results = []
+
while not res_q.empty():
results.append(res_q.get())
- # logger.info("Get test result {0!r}".format(results[-1]))
- yield name, results
+
+ yield name, test.merge_results(results)
def parse_args(argv):
@@ -152,6 +153,7 @@
parser.add_argument("-i", '--build_id', type=str, default="id")
parser.add_argument("-t", '--build_type', type=str, default="GA")
parser.add_argument("-u", '--username', type=str, default="admin")
+ parser.add_argument("-p", '--post-process-only', default=None)
parser.add_argument("-o", '--output-dest', nargs="*")
parser.add_argument("config_file", nargs="?", default="config.yaml")
@@ -262,7 +264,7 @@
os_nodes_ids.append(node_id)
new_nodes.append(new_node)
- store_nodes_in_log(os_nodes_ids)
+ store_nodes_in_log(cfg, os_nodes_ids)
ctx.openstack_nodes_ids = os_nodes_ids
connect_all(new_nodes)
@@ -272,18 +274,9 @@
def shut_down_vms_stage(cfg, ctx):
+ vm_ids_fname = cfg_dict['vm_ids_fname']
if ctx.openstack_nodes_ids is None:
- data = open('vm_journal.log').read().strip()
-
- if data == "":
- logger.info("Journal file is empty")
- return
-
- try:
- nodes_ids = pickle.loads(data)
- except:
- logger.error("File vm_journal.log corrupted")
- return
+ nodes_ids = open(vm_ids_fname).read().split()
else:
nodes_ids = ctx.openstack_nodes_ids
@@ -291,16 +284,18 @@
start_vms.clear_nodes(nodes_ids)
logger.info("Nodes has been removed")
+ if os.path.exists(vm_ids_fname):
+ os.remove(vm_ids_fname)
-def store_nodes_in_log(nodes_ids):
- with open('vm_journal.log', 'w+') as f:
- f.write(pickle.dumps([nodes_ids]))
+
+def store_nodes_in_log(cfg, nodes_ids):
+ with open(cfg['vm_ids_fname'], 'w') as fd:
+ fd.write("\n".join(nodes_ids))
def clear_enviroment(cfg, ctx):
- if os.path.exists('vm_journal.log'):
+ if os.path.exists(cfg_dict['vm_ids_fname']):
shut_down_vms_stage(cfg, ctx)
- os.remove('vm_journal.log')
def run_tests_stage(cfg, ctx):
@@ -316,6 +311,44 @@
node.connection.close()
+def yamable(data):
+ if isinstance(data, (tuple, list)):
+ return map(yamable, data)
+
+ if isinstance(data, unicode):
+ return str(data)
+
+ if isinstance(data, dict):
+ res = {}
+ for k, v in data.items():
+ res[yamable(k)] = yamable(v)
+ return res
+
+ return data
+
+
+def store_raw_results_stage(cfg, ctx):
+
+ raw_results = os.path.join(cfg_dict['var_dir'], 'raw_results.yaml')
+
+ if os.path.exists(raw_results):
+ cont = yaml.load(open(raw_results).read())
+ else:
+ cont = []
+
+ cont.extend(yamable(ctx.results))
+ raw_data = pretty_yaml.dumps(cont)
+
+ with open(raw_results, "w") as fd:
+ fd.write(raw_data)
+
+
+def console_report_stage(cfg, ctx):
+ for tp, data in ctx.results:
+ if 'io' == tp:
+ print format_results_for_console(data)
+
+
def report_stage(cfg, ctx):
output_dest = cfg.get('output_dest')
@@ -340,29 +373,41 @@
logger.debug(str(node))
-def load_config(path):
- global cfg_dict
- cfg_dict = parse_config(path)
+def load_data_from(var_dir):
+ def closure(cfg, ctx):
+ raw_results = os.path.join(var_dir, 'raw_results.yaml')
+ ctx.results = yaml.load(open(raw_results).read())
+ return closure
def main(argv):
opts = parse_args(argv)
+ if opts.post_process_only is not None:
+ stages = [
+ load_data_from(opts.post_process_only),
+ console_report_stage,
+ # report_stage
+ ]
+ else:
+ stages = [
+ discover_stage,
+ log_nodes_statistic,
+ complete_log_nodes_statistic,
+ connect_stage,
+ complete_log_nodes_statistic,
+ deploy_sensors_stage,
+ run_tests_stage,
+ store_raw_results_stage,
+ console_report_stage,
+ report_stage
+ ]
+
level = logging.DEBUG if opts.extra_logs else logging.WARNING
setup_logger(logger, level)
- stages = [
- discover_stage,
- log_nodes_statistic,
- complete_log_nodes_statistic,
- connect_stage,
- # complete_log_nodes_statistic,
- deploy_sensors_stage,
- run_tests_stage,
- report_stage
- ]
-
load_config(opts.config_file)
+ logger.info("Store all info into {0}".format(cfg_dict['var_dir']))
ctx = Context()
ctx.build_meta['build_id'] = opts.build_id
@@ -373,7 +418,6 @@
try:
for stage in stages:
logger.info("Start {0.__name__} stage".format(stage))
- print "Start {0.__name__} stage".format(stage)
stage(cfg_dict, ctx)
finally:
exc, cls, tb = sys.exc_info()
diff --git a/start_vms.py b/start_vms.py
index 821c299..db25665 100644
--- a/start_vms.py
+++ b/start_vms.py
@@ -110,6 +110,8 @@
srv_count = len([srv for srv in lst if srv.status == 'enabled'])
count = srv_count * int(count[1:])
+ msg_templ = "Will start {0} servers with next params: {1}"
+ logger.debug(msg_templ.format(count, ""))
# vm_creds = config['vm_params']['creds'] ?????
vm_creds = params.pop('creds')
diff --git a/statistic.py b/statistic.py
index 7ecdaa0..01b0cec 100644
--- a/statistic.py
+++ b/statistic.py
@@ -44,31 +44,38 @@
""" x, y - test data, xnew - dots, where we want find approximation
if not relative_dist distance = y - newy
returns ynew - y values of linear approximation"""
+
# convert to numpy.array (don't work without it)
ox = array(x)
oy = array(y)
- # define function for initial value
- def get_init(x, y):
- """ create initial value for better work of leastsq """
- A = [[x[i], 1.0] for i in range(0, 2)]
- b = [y[i] for i in range(0, 2)]
- return tuple(linalg.solve(A, b))
+
# set approximation function
- funcLine = lambda tpl, x: tpl[0] * x + tpl[1]
+ def func_line(tpl, x):
+ return tpl[0] * x + tpl[1]
+
+ def error_func_rel(tpl, x, y):
+ return 1.0 - y / func_line(tpl, x)
+
+ def error_func_abs(tpl, x, y):
+ return y - func_line(tpl, x)
+
# choose distance mode
- if relative_dist:
- ErrorFunc = lambda tpl, x, y: 1.0 - y/funcLine(tpl, x)
- else:
- ErrorFunc = lambda tpl, x, y: y - funcLine(tpl, x)
- # choose initial value
- tplInitial = get_init(ox, oy)
+ error_func = error_func_rel if relative_dist else error_func_abs
+
+ tpl_initial = tuple(linalg.solve([[ox[0], 1.0], [ox[1], 1.0]],
+ oy[:2]))
+
# find line
- tplFinal, success = leastsq(ErrorFunc, tplInitial[:], args=(ox, oy))
+ tpl_final, success = leastsq(error_func,
+ tpl_initial[:],
+ args=(ox, oy))
+
# if error
if success not in range(1, 5):
raise ValueError("No line for this dots")
+
# return new dots
- return funcLine(tplFinal, array(xnew))
+ return func_line(tpl_final, array(xnew))
def difference(y, ynew):
@@ -79,31 +86,25 @@
[(abs dif, rel dif) * len(y)],
(abs average, abs max),
(rel average, rel max)"""
- da_sum = 0.0
- dr_sum = 0.0
- da_max = 0.0
- dr_max = 0.0
- dlist = []
+
+ abs_dlist = []
+ rel_dlist = []
+
for y1, y2 in zip(y, ynew):
# absolute
- da = y1 - y2
- da_sum += abs(da)
- if abs(da) > da_max:
- da_max = da
- # relative
- if y1 != 0:
- dr = abs(da / y1)
- dr_sum += dr
- if dr > dr_max:
- dr_max = dr
- else:
- dr = None
- # add to list
- dlist.append((da, dr))
- da_sum /= len(y)
- dr_sum /= len(y)
- return dlist, (da_sum, da_max), (dr_sum, dr_max)
+ abs_dlist.append(y1 - y2)
+ if y1 > 1E-6:
+ rel_dlist.append(abs(abs_dlist[-1] / y1))
+ else:
+ raise ZeroDivisionError("{0!r} is too small".format(y1))
+
+ da_avg = sum(abs_dlist) / len(abs_dlist)
+ dr_avg = sum(rel_dlist) / len(rel_dlist)
+
+ return (zip(abs_dlist, rel_dlist),
+ (da_avg, max(abs_dlist)), (dr_avg, max(rel_dlist))
+ )
def calculate_distribution_properties(data):
diff --git a/tests/disk_test_agent.py b/tests/disk_test_agent.py
index 8a0d165..5de3038 100644
--- a/tests/disk_test_agent.py
+++ b/tests/disk_test_agent.py
@@ -89,8 +89,15 @@
assert 'group_reporting' in processed_vals,\
group_report_err_msg
+ ramp_time = processed_vals.get('ramp_time')
+
for i in range(repeat):
yield name.format(**params), processed_vals.copy()
+ if 'ramp_time' in processed_vals:
+ del processed_vals['ramp_time']
+
+ if ramp_time is not None:
+ processed_vals['ramp_time'] = ramp_time
def calculate_execution_time(combinations):
@@ -203,11 +210,9 @@
raise
-def estimate_iops(sz, bw, lat):
- return 1 / (lat + float(sz) / bw)
-
-
def do_run_fio_fake(bconf):
+ def estimate_iops(sz, bw, lat):
+ return 1 / (lat + float(sz) / bw)
global count
count += 1
parsed_out = []
@@ -386,8 +391,8 @@
j_res["concurence"] = int(jconfig.get("numjobs", 1))
j_res["blocksize"] = jconfig["blocksize"]
j_res["jobname"] = job_output["jobname"]
- j_res["timings"] = (jconfig.get("runtime"),
- jconfig.get("ramp_time"))
+ j_res["timings"] = [int(jconfig.get("runtime", 0)),
+ int(jconfig.get("ramp_time", 0))]
else:
j_res = res[jname]
assert j_res["action"] == jconfig["rw"]
diff --git a/tests/io_scenario_check_distribution.cfg b/tests/io_scenario_check_distribution.cfg
new file mode 100644
index 0000000..6ba3f9f
--- /dev/null
+++ b/tests/io_scenario_check_distribution.cfg
@@ -0,0 +1,13 @@
+[distrubution_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=4k
+rw=randwrite
+direct=1
+ramp_time=5
+buffered=0
+wait_for_previous
+filename={FILENAME}
+iodepth=1
+size=10Gb
+time_based
+runtime=30
+group_reporting
diff --git a/tests/io_scenario_hdd.cfg b/tests/io_scenario_hdd.cfg
new file mode 100644
index 0000000..3238503
--- /dev/null
+++ b/tests/io_scenario_hdd.cfg
@@ -0,0 +1,47 @@
+[defaults]
+wait_for_previous
+group_reporting
+time_based
+buffered=0
+iodepth=1
+
+filename={FILENAME}
+NUM_ROUNDS={NUM_ROUNDS}
+
+ramp_time=5
+size=10Gb
+runtime=30
+
+# ---------------------------------------------------------------------
+# check different thread count, sync mode. (latency, bw) = func(th_count)
+# ---------------------------------------------------------------------
+[hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize={% 4k %}
+rw=randwrite
+sync=1
+numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
+
+# ---------------------------------------------------------------------
+# check different thread count, direct read mode. (latency, bw) = func(th_count)
+# ---------------------------------------------------------------------
+[hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize={% 4k %}
+rw=randread
+direct=1
+numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
+
+# ---------------------------------------------------------------------
+# check IOPS read/write. (latency, bw) = func(th_count)
+# ---------------------------------------------------------------------
+[hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=4k
+rw={% randwrite, randread %}
+direct=1
+
+# ---------------------------------------------------------------------
+# check BW for seq read/write. (latency, bw) = func(th_count)
+# ---------------------------------------------------------------------
+[hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=1m
+rw={% write, read %}
+direct=1
diff --git a/tests/io_task_test.cfg b/tests/io_task_test.cfg
index 4d78578..24d62a9 100644
--- a/tests/io_task_test.cfg
+++ b/tests/io_task_test.cfg
@@ -1,14 +1,29 @@
-[writetest * 3]
+[writetest * 7]
group_reporting
numjobs=4
wait_for_previous
-ramp_time=5
+ramp_time=15
blocksize=4k
-filename=/tmp/xxx.bin
+filename={FILENAME}
rw=randwrite
direct=1
buffered=0
iodepth=1
-size=100Mb
-runtime=10
+size=1000Mb
+runtime=30
+time_based
+
+[readtest * 7]
+group_reporting
+numjobs=4
+wait_for_previous
+ramp_time=15
+blocksize=4k
+filename={FILENAME}
+rw=randread
+direct=1
+buffered=0
+iodepth=1
+size=1000Mb
+runtime=30
time_based
diff --git a/tests/itest.py b/tests/itest.py
index 53c4af3..a048703 100644
--- a/tests/itest.py
+++ b/tests/itest.py
@@ -1,10 +1,10 @@
-import re
import abc
-import json
+import time
import os.path
import logging
from disk_perf_test_tool.tests import disk_test_agent
+from disk_perf_test_tool.tests.disk_test_agent import parse_fio_config_full
from disk_perf_test_tool.tests.io_results_loader import parse_output
from disk_perf_test_tool.ssh_utils import copy_paths
from disk_perf_test_tool.utils import run_over_ssh, ssize_to_b
@@ -51,17 +51,15 @@
def pre_run(self, conn):
remote_script = self.copy_script(conn, self.pre_run_script)
cmd = remote_script
- code, out_err = run_over_ssh(conn, cmd)
- if code != 0:
- raise Exception("Pre run failed. %s" % out_err)
+ run_over_ssh(conn, cmd)
def run(self, conn, barrier):
remote_script = self.copy_script(conn, self.run_script)
cmd_opts = ' '.join(["%s %s" % (key, val) for key, val
in self.opts.items()])
cmd = remote_script + ' ' + cmd_opts
- code, out_err = run_over_ssh(conn, cmd)
- self.on_result(code, out_err, cmd)
+ out_err = run_over_ssh(conn, cmd)
+ self.on_result(out_err, cmd)
def parse_results(self, out):
for line in out.split("\n"):
@@ -69,16 +67,12 @@
if key and value:
self.on_result_cb((key, float(value)))
- def on_result(self, code, out_err, cmd):
- if 0 == code:
- try:
- self.parse_results(out_err)
- except Exception as exc:
- msg_templ = "Error during postprocessing results: {0!r}"
- raise RuntimeError(msg_templ.format(exc.message))
- else:
- templ = "Command {0!r} failed with code {1}. Error output is:\n{2}"
- logger.error(templ.format(cmd, code, out_err))
+ def on_result(self, out_err, cmd):
+ try:
+ self.parse_results(out_err)
+ except Exception as exc:
+ msg_templ = "Error during postprocessing results: {0!r}. {1}"
+ raise RuntimeError(msg_templ.format(exc.message, out_err))
class PgBenchTest(TwoScriptTest):
@@ -102,14 +96,19 @@
self.config_params = test_options.get('params', {})
self.tool = test_options.get('tool', 'fio')
self.raw_cfg = open(self.config_fname).read()
-
- parse_func = disk_test_agent.parse_fio_config_full
- self.configs = parse_func(self.raw_cfg, self.config_params)
+ self.configs = parse_fio_config_full(self.raw_cfg, self.config_params)
def pre_run(self, conn):
# TODO: install fio, if not installed
- run_over_ssh(conn, "apt-get -y install fio")
+ cmd = "sudo apt-get -y install fio"
+
+ for i in range(3):
+ try:
+ run_over_ssh(conn, cmd)
+ break
+ except OSError:
+ time.sleep(3)
local_fname = disk_test_agent.__file__.rsplit('.')[0] + ".py"
self.files_to_copy = {local_fname: self.io_py_remote}
@@ -123,30 +122,64 @@
msz += 1
cmd = cmd_templ.format(params['filename'], 1024 ** 2, msz)
- code, out_err = run_over_ssh(conn, cmd)
-
- if code != 0:
- raise RuntimeError("Preparation failed " + out_err)
+ run_over_ssh(conn, cmd)
def run(self, conn, barrier):
- cmd_templ = "env python2 {0} --type {1} --json -"
- cmd = cmd_templ.format(self.io_py_remote, self.tool)
- logger.debug("Run {0}".format(cmd))
+ cmd_templ = "env python2 {0} --type {1} {2} --json -"
+
+ params = " ".join("{0}={1}".format(k, v)
+ for k, v in self.config_params.items())
+
+ if "" != params:
+ params = "--params " + params
+
+ cmd = cmd_templ.format(self.io_py_remote, self.tool, params)
+ logger.debug("Waiting on barrier")
try:
barrier.wait()
- code, out_err = run_over_ssh(conn, cmd, stdin_data=self.raw_cfg)
- self.on_result(code, out_err, cmd)
+ logger.debug("Run {0}".format(cmd))
+ out_err = run_over_ssh(conn, cmd, stdin_data=self.raw_cfg)
+ self.on_result(out_err, cmd)
finally:
barrier.exit()
- def on_result(self, code, out_err, cmd):
- if 0 == code:
- try:
- for data in parse_output(out_err):
- self.on_result_cb(data)
- except Exception as exc:
- msg_templ = "Error during postprocessing results: {0!r}"
- raise RuntimeError(msg_templ.format(exc.message))
- else:
- templ = "Command {0!r} failed with code {1}. Output is:\n{2}"
- logger.error(templ.format(cmd, code, out_err))
+ def on_result(self, out_err, cmd):
+ try:
+ for data in parse_output(out_err):
+ self.on_result_cb(data)
+ except Exception as exc:
+ msg_templ = "Error during postprocessing results: {0!r}"
+ raise RuntimeError(msg_templ.format(exc.message))
+
+ def merge_results(self, results):
+ merged_result = results[0]
+ merged_data = merged_result['res']
+ expected_keys = set(merged_data.keys())
+ mergable_fields = ['bw_mean', 'clat', 'iops', 'lat', 'slat']
+
+ for res in results[1:]:
+ assert res['__meta__'] == merged_result['__meta__']
+
+ data = res['res']
+ diff = set(data.keys()).symmetric_difference(expected_keys)
+
+ msg = "Difference: {0}".format(",".join(diff))
+ assert len(diff) == 0, msg
+
+ for testname, test_data in data.items():
+ res_test_data = merged_data[testname]
+
+ diff = set(test_data.keys()).symmetric_difference(
+ res_test_data.keys())
+
+ msg = "Difference: {0}".format(",".join(diff))
+ assert len(diff) == 0, msg
+
+ for k, v in test_data.items():
+ if k in mergable_fields:
+ res_test_data[k].extend(v)
+ else:
+ msg = "{0!r} != {1!r}".format(res_test_data[k], v)
+ assert res_test_data[k] == v, msg
+
+ return merged_result
diff --git a/utils.py b/utils.py
index 2e26207..11b38ae 100644
--- a/utils.py
+++ b/utils.py
@@ -64,7 +64,7 @@
raise
-def run_over_ssh(conn, cmd, stdin_data=None, exec_timeout=60):
+def run_over_ssh(conn, cmd, stdin_data=None, exec_timeout=5 * 60 * 60):
"should be replaces by normal implementation, with select"
transport = conn.get_transport()
session = transport.open_session()
@@ -89,13 +89,19 @@
break
except socket.timeout:
pass
+
if time.time() - stime > exec_timeout:
- return 1, output + "\nExecution timeout"
+ raise OSError(output + "\nExecution timeout")
+
code = session.recv_exit_status()
finally:
session.close()
- return code, output
+ if code != 0:
+ templ = "Cmd {0!r} failed with code {1}. Output: {2}"
+ raise OSError(templ.format(cmd, code, output))
+
+ return output
SMAP = dict(k=1024, m=1024 ** 2, g=1024 ** 3, t=1024 ** 4)