pre-release bug fixes
diff --git a/config.py b/config.py
index ab9ec8c..bb8fab3 100644
--- a/config.py
+++ b/config.py
@@ -23,9 +23,13 @@
cfg_dict['var_dir'] = dr
os.makedirs(cfg_dict['var_dir'])
- cfg_dict['charts_img_path'] = os.path.join(cfg_dict['var_dir'], 'charts')
+ def in_var_dir(fname):
+ return os.path.join(cfg_dict['var_dir'], fname)
+
+ cfg_dict['charts_img_path'] = in_var_dir('charts')
os.makedirs(cfg_dict['charts_img_path'])
- cfg_dict['vm_ids_fname'] = os.path.join(cfg_dict['var_dir'], 'os_vm_ids')
- cfg_dict['report'] = os.path.join(cfg_dict['var_dir'], 'report.html')
- cfg_dict['log_file'] = os.path.join(cfg_dict['var_dir'], 'log.txt')
+ cfg_dict['vm_ids_fname'] = in_var_dir('os_vm_ids')
+ cfg_dict['html_report_file'] = in_var_dir('report.html')
+ cfg_dict['text_report_file'] = in_var_dir('report.txt')
+ cfg_dict['log_file'] = in_var_dir('log.txt')
diff --git a/config.yaml b/config.yaml
index 8bb1fa2..b0e6cc1 100644
--- a/config.yaml
+++ b/config.yaml
@@ -1,90 +1,35 @@
clouds:
- fuel:
- url: http://172.16.52.112:8000
- ext_ip: 172.16.53.3
- creds: admin:admin@admin
- ssh_creds: root:test37
- openstack_env: test
+ fuel:
+ ext_ip: 172.16.53.3
+ url: http://172.16.52.112:8000/
+ creds: admin:admin@admin
+ ssh_creds: root:test37
+ openstack_env: test
discover: fuel
-explicit_nodes:
- "ssh://192.168.152.43": testnode
-
-sensors:
- receiver_uri: udp://192.168.152.1:5699
- roles_mapping:
- ceph-osd: block-io
- testnode: system-cpu, block-io
+internal:
+ var_dir_root: /tmp/perf_tests
tests:
- - io: tests/io_task_test.cfg
-
-logging:
- extra_logs: 1
-
-
-# # nodes to be started/detected
-#clouds:
-# # openstack: file:///tmp/openrc
-# # openstack: user:passwd:tenant, http://......
-# # ceph: ssh://root@172.16.53.2::/home/koder/.ssh/id_rsa.pub
-# fuel:
-# url: http://172.16.52.112:8000
-# creds: admin:admin:admin
-# ssh_creds: root:test37
-# openstack_env: test
-# ceph: local
-
-# # discover: fuel+openstack, ceph
-
-# explicit_nodes:
-# "ssh://192.168.152.43": testnode
-
-# # sensors to be installed, accordingly to role
-# sensors:
-# receiver_uri: udp://192.168.152.1:5699
-# roles_mapping:
-# ceph-osd: block-io #ceph-io, ceph-cpu, ceph-ram, ceph-net
-# # ceph-mon: ceph-io, ceph-cpu, ceph-ram, ceph-net
-# os-compute: io, net
-# test-vm: system-cpu
-
-# # tests to run
-tests: # $include(tests.yaml)
start_test_nodes:
openstack:
-
+ creds: clouds
vm_params:
- count: x1
+ count: x2
img_name: disk_io_perf
flavor_name: disk_io_perf.256
keypair_name: disk_io_perf
network_zone_name: novanetwork
flt_ip_pool: nova
creds: "ssh://ubuntu@{0}::disk_io_perf.pem"
- internal_tests:
- - pgbench:
- opts:
- num_clients: [4, 8, 12]
- transactions: [1, 2, 3]
+ name_templ: disk_io_perf-{0}
+ tests:
+ - io:
+ cfg: tests/io_scenario_hdd.cfg
+ params:
+ FILENAME: /opt/xxx.bin
+ NUM_ROUNDS: 7
-# - io: tests/io_task_test.cfg
-
-# # - vm_count:
-# # max_lat_ms: 20
-# # min_bw_mbps: 60
-# # min_4k_direct_w_iops: 100
-# # min_4k_direct_r_iops: 100
-
-# # where to store results
-# # results:
-# # - mos-linux-http://172.12.33.45
-# # - bson, /tmp/myrun.bson
-
-paths:
- basedir: "/home/gstepanov/rally-results-processor"
- TEST_PATH: "/home/gstepanov/rally-results-processor/test_results"
- CHARTS_IMG_PATH: "static/images"
- SQLALCHEMY_MIGRATE_REPO: "/home/gstepanov/rally-results-processor/db_repository"
- DATABASE_URI: 'sqlite:////home/gstepanov/rally-results-processor/app.db?check_same_thread=False'
+logging:
+ extra_logs: 1
\ No newline at end of file
diff --git a/formatters.py b/formatters.py
index bbbf8d2..967d8f3 100644
--- a/formatters.py
+++ b/formatters.py
@@ -1,61 +1,8 @@
-import itertools
-from collections import defaultdict
-
import texttable
+from utils import ssize_to_b
from statistic import med_dev
-# [{u'__meta__': {u'raw_cfg': u'[writetest * 2]\ngroup_reporting\nnumjobs=4\nwait_for_previous\nramp_time=5\nblocksize=4k\nfilename={FILENAME}\nrw=randwrite\ndirect=1\nbuffered=0\niodepth=1\nsize=100Mb\nruntime=10\ntime_based\n\n[readtest * 2]\ngroup_reporting\nnumjobs=4\nwait_for_previous\nramp_time=5\nblocksize=4k\nfilename={FILENAME}\nrw=randread\ndirect=1\nbuffered=0\niodepth=1\nsize=100Mb\nruntime=10\ntime_based\n'},
-# u'res': {u'readtest': {u'action': u'randread',
-# u'blocksize': u'4k',
-# u'bw_mean': [349.61, 276.54],
-# u'clat': [11987.16, 15235.08],
-# u'concurence': 4,
-# u'direct_io': True,
-# u'iops': [316, 251],
-# u'jobname': u'readtest',
-# u'lat': [11987.52, 15235.46],
-# u'slat': [0.0, 0.0],
-# u'sync': False,
-# u'timings': [u'10', u'5']},
-# u'writetest': {u'action': u'randwrite',
-# u'blocksize': u'4k',
-# u'bw_mean': [72.03, 61.84],
-# u'clat': [113525.86, 152836.42],
-# u'concurence': 4,
-# u'direct_io': True,
-# u'iops': [35, 23],
-# u'jobname': u'writetest',
-# u'lat': [113526.31, 152836.89],
-# u'slat': [0.0, 0.0],
-# u'sync': False,
-# u'timings': [u'10', u'5']}}},
-# {u'__meta__': {u'raw_cfg': u'[writetest * 2]\ngroup_reporting\nnumjobs=4\nwait_for_previous\nramp_time=5\nblocksize=4k\nfilename={FILENAME}\nrw=randwrite\ndirect=1\nbuffered=0\niodepth=1\nsize=100Mb\nruntime=10\ntime_based\n\n[readtest * 2]\ngroup_reporting\nnumjobs=4\nwait_for_previous\nramp_time=5\nblocksize=4k\nfilename={FILENAME}\nrw=randread\ndirect=1\nbuffered=0\niodepth=1\nsize=100Mb\nruntime=10\ntime_based\n'},
-# u'res': {u'readtest': {u'action': u'randread',
-# u'blocksize': u'4k',
-# u'bw_mean': [287.62, 280.76],
-# u'clat': [15437.57, 14741.65],
-# u'concurence': 4,
-# u'direct_io': True,
-# u'iops': [258, 271],
-# u'jobname': u'readtest',
-# u'lat': [15437.94, 14742.04],
-# u'slat': [0.0, 0.0],
-# u'sync': False,
-# u'timings': [u'10', u'5']},
-# u'writetest': {u'action': u'randwrite',
-# u'blocksize': u'4k',
-# u'bw_mean': [71.18, 61.62],
-# u'clat': [116382.95, 153486.81],
-# u'concurence': 4,
-# u'direct_io': True,
-# u'iops': [32, 22],
-# u'jobname': u'writetest',
-# u'lat': [116383.44, 153487.27],
-# u'slat': [0.0, 0.0],
-# u'sync': False,
-# u'timings': [u'10', u'5']}}}]
-
def get_test_descr(data):
rw = {"randread": "rr",
@@ -63,26 +10,38 @@
"read": "sr",
"write": "sw"}[data["action"]]
- if data["direct_io"]:
- sync_mode = 'd'
- elif data["sync"]:
- sync_mode = 's'
- else:
- sync_mode = 'a'
+ return "{0}{1}{2}_th{3}".format(rw,
+ data['sync_mode'],
+ data['blocksize'],
+ data['concurence'])
- th_count = int(data['concurence'])
- return "{0}{1}{2}_th{3}".format(rw, sync_mode,
- data['blocksize'], th_count)
+def key_func(k_data):
+ _, data = k_data
+
+ bsz = ssize_to_b(data['blocksize'])
+ tp = data['action']
+ return tp, data['sync_mode'], bsz, data['concurence']
def format_results_for_console(test_set):
data_for_print = []
tab = texttable.Texttable()
- tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER | tab.HLINES)
+ tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
tab.set_cols_align(["l", "r", "r", "r", "r"])
- for test_name, data in test_set['res'].items():
+ items = sorted(test_set['res'].items(), key=key_func)
+ prev_k = None
+
+ for test_name, data in items:
+ curr_k = key_func((test_name, data))[:3]
+
+ if prev_k is not None:
+ if prev_k != curr_k:
+ data_for_print.append(["---"] * 5)
+
+ prev_k = curr_k
+
descr = get_test_descr(data)
iops, _ = med_dev(data['iops'])
@@ -96,33 +55,8 @@
data_for_print.append(params)
header = ["Description", "IOPS", "BW KBps", "Dev * 3 %", "LAT ms"]
- tab.add_row(header)
- tab.header = header
+ tab.header(header)
map(tab.add_row, data_for_print)
return tab.draw()
-
-
-# def format_pgbench_stat(res):
-# """
-# Receives results in format:
-# "<num_clients> <num_transactions>: <tps>
-# <num_clients> <num_transactions>: <tps>
-# ....
-# "
-# """
-# if res:
-# data = {}
-# grouped_res = itertools.groupby(res, lambda x: x[0])
-# for key, group in grouped_res:
-# results = list(group)
-# sum_res = sum([r[1] for r in results])
-# mean = sum_res/len(results)
-# sum_sq = sum([(r[1] - mean) ** 2 for r in results])
-# if len(results) > 1:
-# dev = (sum_sq / (len(results) - 1))
-# else:
-# dev = 0
-# data[key] = (mean, dev)
-# return data
diff --git a/io_results_loader.py b/io_results_loader.py
index 9c49e06..bf8a585 100644
--- a/io_results_loader.py
+++ b/io_results_loader.py
@@ -3,7 +3,7 @@
from utils import ssize_to_b
-import statistic as data_stat
+from statistic import med_dev
def parse_output(out_err):
@@ -41,15 +41,11 @@
data = list(parse_output(raw_data))[0]
for key, val in data['res'].items():
- if 'blocksize' not in val:
- val['blocksize'] = key.split('_')[2][3:].split('th')[0]
-
val['blocksize_b'] = ssize_to_b(val['blocksize'])
- val['iops_mediana'], val['iops_stddev'] = \
- data_stat.med_dev(val['iops'])
- val['bw_mediana'], val['bw_stddev'] = data_stat.med_dev(val['bw_mean'])
- val['lat_mediana'], val['lat_stddev'] = data_stat.med_dev(val['lat'])
+ val['iops_mediana'], val['iops_stddev'] = med_dev(val['iops'])
+ val['bw_mediana'], val['bw_stddev'] = med_dev(val['bw_mean'])
+ val['lat_mediana'], val['lat_stddev'] = med_dev(val['lat'])
yield val
diff --git a/keystone.py b/keystone.py
index 3ae5e7a..24d322c 100644
--- a/keystone.py
+++ b/keystone.py
@@ -1,8 +1,9 @@
-from functools import partial
import json
import urllib2
-from keystoneclient.v2_0 import Client as keystoneclient
+from functools import partial
+
from keystoneclient import exceptions
+from keystoneclient.v2_0 import Client as keystoneclient
class Urllib2HTTP(object):
diff --git a/logger.py b/logger.py
deleted file mode 100644
index bd4c6ef..0000000
--- a/logger.py
+++ /dev/null
@@ -1,19 +0,0 @@
-#!/usr/bin/env python
-""" Logger initialization """
-
-import logging
-
-
-def define_logger(name):
- """ Initialization of logger"""
- logger = logging.getLogger(name)
- logger.setLevel(logging.INFO)
- ch = logging.StreamHandler()
- ch.setLevel(logging.INFO)
- logger.addHandler(ch)
-
- log_format = '%(asctime)s - %(levelname)s - %(name)s - %(message)s'
- formatter = logging.Formatter(log_format,
- "%H:%M:%S")
- ch.setFormatter(formatter)
- return logger
diff --git a/perf1.yaml b/perf1.yaml
new file mode 100644
index 0000000..861c254
--- /dev/null
+++ b/perf1.yaml
@@ -0,0 +1,52 @@
+clouds:
+ fuel:
+ ext_ip: 172.16.53.3
+ url: http://172.16.52.112:8000/
+ creds: admin:admin@admin
+ ssh_creds: root:test37
+ openstack_env: test
+
+ openstack:
+ OS_TENANT_NAME: admin
+ OS_USERNAME: admin
+ OS_PASSWORD: admin
+ OS_AUTH_URL: http://172.16.53.3:5000/v2.0/
+
+# discover: fuel
+discover:
+
+explicit_nodes:
+ "ssh://koder:koder771@@127.0.0.1": testnode
+
+internal:
+ var_dir_root: /tmp/perf_tests
+
+# sensors:
+# receiver_uri: udp://192.168.152.1:5699
+# roles_mapping:
+# ceph-osd: block-io
+# testnode: system-cpu, block-io
+
+tests:
+ # start_test_nodes:
+ # openstack:
+ # creds: clouds
+ # vm_params:
+ # count: x2
+ # img_name: disk_io_perf
+ # flavor_name: disk_io_perf.256
+ # keypair_name: disk_io_perf
+ # network_zone_name: novanetwork
+ # flt_ip_pool: nova
+ # creds: "ssh://ubuntu@{0}::disk_io_perf.pem"
+ # name_templ: disk_io_perf-{0}
+
+ - io:
+ # cfg: scripts/fio_tests_configs/io_task_test.cfg
+ cfg: tests/io_scenario_hdd.cfg
+ params:
+ FILENAME: /media/koder/a5230078-4c27-4c3b-99aa-26148e78b2e7/xxx.bin
+ NUM_ROUNDS: 7
+
+logging:
+ extra_logs: 1
\ No newline at end of file
diff --git a/report.py b/report.py
index 95851e0..bce6690 100644
--- a/report.py
+++ b/report.py
@@ -1,72 +1,68 @@
-import argparse
+import sys
from collections import OrderedDict
+import matplotlib.pyplot as plt
import formatters
from chart import charts
-from statistic import med_dev
from utils import ssize_to_b
-from disk_perf_test_tool.tests.io_results_loader import parse_output
+from statistic import med_dev, approximate_curve
+
+from disk_perf_test_tool.tests.io_results_loader import (load_files,
+ filter_data)
OPERATIONS = (('async', ('randwrite asynchronous', 'randread asynchronous',
'write asynchronous', 'read asynchronous')),
('sync', ('randwrite synchronous', 'randread synchronous',
- 'write synchronous', 'read synchronous')))
+ 'write synchronous', 'read synchronous')),
+ ('direct', ('randwrite direct', 'randread direct',
+ 'write direct', 'read direct')))
sync_async_view = {'s': 'synchronous',
- 'a': 'asynchronous'}
+ 'a': 'asynchronous',
+ 'd': 'direct'}
-def parse_args(argv):
- parser = argparse.ArgumentParser()
- parser.add_argument('-s', '--storage', help='storage location', dest="url")
- parser.add_argument('-e', '--email', help='user email',
- default="aaa@gmail.com")
- parser.add_argument('-p', '--password', help='user password',
- default="1234")
- return parser.parse_args(argv)
+# def pgbench_chart_data(results):
+# """
+# Format pgbench results for chart
+# """
+# data = {}
+# charts_url = []
+# formatted_res = formatters.format_pgbench_stat(results)
+# for key, value in formatted_res.items():
+# num_cl, num_tr = key.split(' ')
+# data.setdefault(num_cl, {}).setdefault(build, {})
+# data[keys[z]][build][
+# ' '.join(keys)] = value
-def pgbench_chart_data(results):
- """
- Format pgbench results for chart
- """
- data = {}
- charts_url = []
+# for name, value in data.items():
+# title = name
+# legend = []
+# dataset = []
- formatted_res = formatters.format_pgbench_stat(results)
- for key, value in formatted_res.items():
- num_cl, num_tr = key.split(' ')
- data.setdefault(num_cl, {}).setdefault(build, {})
- data[keys[z]][build][
- ' '.join(keys)] = value
+# scale_x = []
- for name, value in data.items():
- title = name
- legend = []
- dataset = []
+# for build_id, build_results in value.items():
+# vals = []
+# OD = OrderedDict
+# ordered_build_results = OD(sorted(build_results.items(),
+# key=lambda t: t[0]))
+# scale_x = ordered_build_results.keys()
+# for key in scale_x:
+# res = build_results.get(key)
+# if res:
+# vals.append(res)
+# if vals:
+# dataset.append(vals)
+# legend.append(build_id)
- scale_x = []
-
- for build_id, build_results in value.items():
- vals = []
- OD = OrderedDict
- ordered_build_results = OD(sorted(build_results.items(),
- key=lambda t: t[0]))
- scale_x = ordered_build_results.keys()
- for key in scale_x:
- res = build_results.get(key)
- if res:
- vals.append(res)
- if vals:
- dataset.append(vals)
- legend.append(build_id)
-
- if dataset:
- charts_url.append(str(charts.render_vertical_bar
- (title, legend, dataset, scale_x=scale_x)))
- return charts_url
+# if dataset:
+# charts_url.append(str(charts.render_vertical_bar
+# (title, legend, dataset, scale_x=scale_x)))
+# return charts_url
def build_vertical_bar(results, z=0):
@@ -155,56 +151,86 @@
pass
-def render_html_results(ctx, dest):
- charts = []
- for res in ctx.results:
- if res[0] == "io":
- charts.append(build_io_chart(res))
+# def render_html_results(ctx):
+# charts = []
+# for res in ctx.results:
+# if res[0] == "io":
+# charts.append(build_io_chart(res))
- bars = build_vertical_bar(ctx.results)
- lines = build_lines_chart(ctx.results)
+# bars = build_vertical_bar(ctx.results)
+# lines = build_lines_chart(ctx.results)
- render_html(bars + lines, dest)
+ # render_html(bars + lines, dest)
-def main():
- out = parse_output(
- open("results/io_scenario_check_th_count.txt").read()).next()
- results = out['res']
+def make_io_report(results):
+ for suite_type, test_suite_data in results:
+ if suite_type != 'io':
+ continue
- charts_url = []
- charts_data = {}
+ io_test_suite_res = test_suite_data['res']
- for test_name, test_res in results.items():
+ charts_url = []
- blocksize = test_res['blocksize']
- op_type = "sync" if test_res['sync'] else "direct"
- chart_name = "Block size: %s %s" % (blocksize, op_type)
+ name_filters = [
+ #('hdd_test_rws4k', ('concurence', 'lat', 'iops')),
+ #('hdd_test_rrs4k', ('concurence', 'lat', 'iops')),
+ ('hdd_test_rrd4k', ('concurence', 'lat', 'iops')),
+ ('hdd_test_swd1m', ('concurence', 'lat', 'bw_mean')),
+ ]
- lat, lat_dev = med_dev(test_res['lat'])
- iops, iops_dev = med_dev(test_res['iops'])
- bw, bw_dev = med_dev(test_res['bw_mean'])
- conc = test_res['concurence']
+ for name_filter, fields in name_filters:
+ th_filter = filter_data(name_filter, fields)
- vals = ((lat, lat_dev), (iops, iops_dev), (bw, bw_dev))
- charts_data.setdefault(chart_name, {})[conc] = vals
+ data_iter = sorted(th_filter(io_test_suite_res.values()))
- for chart_name, chart_data in charts_data.items():
- legend = ["bw"]
- ordered_data = OrderedDict(sorted(chart_data.items(),
- key=lambda t: t[0]))
+ concurence, latv, iops_or_bw_v = zip(*data_iter)
+ iops_or_bw_v, iops_or_bw_dev_v = zip(*map(med_dev, iops_or_bw_v))
- lat_d, iops_d, bw_d = zip(*ordered_data.values())
- bw_sum = [vals[2][0] * conc for conc, vals in ordered_data.items()]
+ _, ax1 = plt.subplots()
- chart_url = str(charts.render_vertical_bar(
- chart_name, legend, [bw_d], label_x="KBps",
- scale_x=ordered_data.keys(),
- lines=[(zip(*lat_d)[0], 'msec', 'rr', 'lat'), (bw_sum, None, None, 'bw_sum')]))
- charts_url.append(chart_url)
+ ax1.plot(concurence, iops_or_bw_v)
+ ax1.errorbar(concurence, iops_or_bw_v, iops_or_bw_dev_v,
+ linestyle='None',
+ label="iops_or_bw_v",
+ marker="*")
+
+ # ynew = approximate_line(ax, ay, ax, True)
+
+ ax2 = ax1.twinx()
+
+ ax2.errorbar(concurence,
+ [med_dev(lat)[0] / 1000 for lat in latv],
+ [med_dev(lat)[1] / 1000 for lat in latv],
+ linestyle='None',
+ label="iops_or_bw_v",
+ marker="*")
+ ax2.plot(concurence, [med_dev(lat)[0] / 1000 for lat in latv])
+ plt.show()
+ exit(0)
+
+ # bw_only = []
+
+ # for conc, _, _, (bw, _) in data:
+ # bw_only.append(bw)
+ # bw_d_per_th.append((bw / conc, 0))
+
+ # lines = [(zip(*lat_d)[0], 'msec', 'rr', 'lat'), (bw_sum, None, None, 'bw_sum')]
+
+ # chart_url = charts.render_vertical_bar(
+ # chart_name, ["bw"], [bw_d_per_th], label_x="KBps",
+ # scale_x=ordered_data.keys(),
+ # lines=lines)
+
+ # charts_url.append(str(chart_url))
+
render_html(charts_url, "results.html")
+
+
+def main(args):
+ make_io_report('/tmp/report', load_files(args[1:]))
return 0
if __name__ == '__main__':
- exit(main())
+ exit(main(sys.argv))
diff --git a/run_test.py b/run_test.py
index 2e01394..60ffbbc 100755
--- a/run_test.py
+++ b/run_test.py
@@ -1,11 +1,9 @@
import os
import sys
-import json
import Queue
import pprint
import logging
import argparse
-import traceback
import threading
import collections
@@ -20,27 +18,65 @@
from nodes import discover
from nodes.node import Node
from config import cfg_dict, load_config
-from tests.itest import IOPerfTest, PgBenchTest
from sensors.api import start_monitoring
+from tests.itest import IOPerfTest, PgBenchTest
from formatters import format_results_for_console
logger = logging.getLogger("io-perf-tool")
+def color_me(color):
+ RESET_SEQ = "\033[0m"
+ COLOR_SEQ = "\033[1;%dm"
+
+ color_seq = COLOR_SEQ % (30 + color)
+
+ def closure(msg):
+ return color_seq + msg + RESET_SEQ
+ return closure
+
+
+class ColoredFormatter(logging.Formatter):
+ BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)
+
+ colors = {
+ 'WARNING': color_me(YELLOW),
+ 'DEBUG': color_me(BLUE),
+ 'CRITICAL': color_me(YELLOW),
+ 'ERROR': color_me(RED)
+ }
+
+ def __init__(self, msg, use_color=True):
+ logging.Formatter.__init__(self, msg)
+ self.use_color = use_color
+
+ def format(self, record):
+ levelname = record.levelname
+
+ if levelname in self.colors:
+ record.levelname = self.colors[levelname](levelname)
+
+ return logging.Formatter.format(self, record)
+
+
def setup_logger(logger, level=logging.DEBUG, log_fname=None):
- # logger.setLevel(level)
+ logger.setLevel(logging.DEBUG)
sh = logging.StreamHandler()
sh.setLevel(level)
log_format = '%(asctime)s - %(levelname)-6s - %(name)s - %(message)s'
+ colored_formatter = ColoredFormatter(log_format,
+ "%H:%M:%S")
+
formatter = logging.Formatter(log_format,
"%H:%M:%S")
- sh.setFormatter(formatter)
+ sh.setFormatter(colored_formatter)
logger.addHandler(sh)
if log_fname is not None:
fh = logging.FileHandler(log_fname)
+ fh.setFormatter(formatter)
fh.setLevel(logging.DEBUG)
logger.addHandler(fh)
@@ -71,6 +107,7 @@
raise ValueError("Unknown url type {0}".format(node.conn_url))
except Exception:
logger.exception("During connect to {0}".format(node))
+ raise
def connect_all(nodes):
@@ -86,21 +123,21 @@
while True:
val = q.get()
if val is None:
- print sensor_data
q.put(sensor_data)
break
sensor_data.append(val)
logger.info("Sensors thread exits")
-def test_thread(test, node, barrier):
+def test_thread(test, node, barrier, res_q):
try:
logger.debug("Run preparation for {0}".format(node.conn_url))
test.pre_run(node.connection)
logger.debug("Run test for {0}".format(node.conn_url))
test.run(node.connection, barrier)
- except:
+ except Exception as exc:
logger.exception("In test {0} for node {1}".format(test, node))
+ res_q.put(exc)
def run_tests(config, nodes):
@@ -114,32 +151,44 @@
res_q = Queue.Queue()
- for test in config['tests']:
- for test in config['tests'][test]['tests']:
- for name, params in test.items():
- logger.info("Starting {0} tests".format(name))
+ for test_block in config:
+ for name, params in test_block.items():
+ logger.info("Starting {0} tests".format(name))
- threads = []
- barrier = utils.Barrier(len(test_nodes))
- for node in test_nodes:
- msg = "Starting {0} test on {1} node"
- logger.debug(msg.format(name, node.conn_url))
- test = tool_type_mapper[name](params, res_q.put)
- th = threading.Thread(None, test_thread, None,
- (test, node, barrier))
- threads.append(th)
- th.daemon = True
- th.start()
+ threads = []
+ barrier = utils.Barrier(len(test_nodes))
+ for node in test_nodes:
+ msg = "Starting {0} test on {1} node"
+ logger.debug(msg.format(name, node.conn_url))
+ test = tool_type_mapper[name](params, res_q.put)
+ th = threading.Thread(None, test_thread, None,
+ (test, node, barrier, res_q))
+ threads.append(th)
+ th.daemon = True
+ th.start()
- for th in threads:
- th.join()
-
- results = []
-
+ def gather_results(res_q, results):
while not res_q.empty():
- results.append(res_q.get())
+ val = res_q.get()
- yield name, test.merge_results(results)
+ if isinstance(val, Exception):
+ msg = "Exception during test execution: {0}"
+ raise ValueError(msg.format(val.message))
+
+ results.append(val)
+
+ results = []
+
+ while True:
+ for th in threads:
+ th.join(1)
+ gather_results(res_q, results)
+
+ if all(not th.is_alive() for th in threads):
+ break
+
+ gather_results(res_q, results)
+ yield name, test.merge_results(results)
def parse_args(argv):
@@ -184,7 +233,7 @@
def discover_stage(cfg, ctx):
- if 'discover' in cfg:
+ if cfg.get('discover') is not None:
discover_objs = [i.strip() for i in cfg['discover'].strip().split(",")]
ctx.nodes.extend(discover.discover(discover_objs, cfg['clouds']))
@@ -193,10 +242,10 @@
def deploy_sensors_stage(cfg_dict, ctx):
- ctx.clear_calls_stack.append(remove_sensors_stage)
if 'sensors' not in cfg_dict:
return
+ ctx.clear_calls_stack.append(remove_sensors_stage)
cfg = cfg_dict.get('sensors')
sens_cfg = []
@@ -272,7 +321,7 @@
connect_all(new_nodes)
if 'tests' in cfg:
- ctx.results.extend(run_tests(cfg_dict, ctx.nodes))
+ ctx.results.extend(run_tests(cfg['tests'], ctx.nodes))
def shut_down_vms_stage(cfg, ctx):
@@ -282,9 +331,10 @@
else:
nodes_ids = ctx.openstack_nodes_ids
- logger.info("Removing nodes")
- start_vms.clear_nodes(nodes_ids)
- logger.info("Nodes has been removed")
+ if len(nodes_ids) != 0:
+ logger.info("Removing nodes")
+ start_vms.clear_nodes(nodes_ids)
+ logger.info("Nodes has been removed")
if os.path.exists(vm_ids_fname):
os.remove(vm_ids_fname)
@@ -308,6 +358,8 @@
def disconnect_stage(cfg, ctx):
+ ssh_utils.close_all_sessions()
+
for node in ctx.nodes:
if node.connection is not None:
node.connection.close()
@@ -352,21 +404,23 @@
def report_stage(cfg, ctx):
- output_dest = cfg.get('output_dest')
+ # html_report = report.make_io_report(ctx.results)
+ # html_rep_fname = cfg['html_report_file']
- if output_dest is not None:
- if output_dest.endswith(".html"):
- report.render_html_results(ctx, output_dest)
- logger.info("Results were stored in %s" % output_dest)
- else:
- with open(output_dest, "w") as fd:
- data = {"sensor_data": ctx.sensor_data,
- "results": ctx.results}
- fd.write(json.dumps(data))
- else:
- print "=" * 20 + " RESULTS " + "=" * 20
- pprint.pprint(ctx.results)
- print "=" * 60
+ # with open(html_rep_fname, "w") as fd:
+ # fd.write(html_report)
+
+ # logger.info("Html report were stored in " + html_rep_fname)
+
+ text_rep_fname = cfg_dict['text_report_file']
+ with open(text_rep_fname, "w") as fd:
+ for tp, data in ctx.results:
+ if 'io' == tp:
+ fd.write(format_results_for_console(data))
+ fd.write("\n")
+ fd.flush()
+
+ logger.info("Text report were stored in " + text_rep_fname)
def complete_log_nodes_statistic(cfg, ctx):
@@ -389,15 +443,13 @@
stages = [
load_data_from(opts.post_process_only),
console_report_stage,
- # report_stage
+ report_stage
]
else:
stages = [
discover_stage,
log_nodes_statistic,
- complete_log_nodes_statistic,
connect_stage,
- complete_log_nodes_statistic,
deploy_sensors_stage,
run_tests_stage,
store_raw_results_stage,
@@ -410,7 +462,8 @@
level = logging.DEBUG if opts.extra_logs else logging.WARNING
setup_logger(logger, level, cfg_dict['log_file'])
- logger.info("Store all info into {0}".format(cfg_dict['var_dir']))
+ logger.info("All info would be stored into {0}".format(
+ cfg_dict['var_dir']))
ctx = Context()
ctx.build_meta['build_id'] = opts.build_id
@@ -434,6 +487,7 @@
if exc is not None:
raise exc, cls, tb
+ logger.info("All info stotored into {0}".format(cfg_dict['var_dir']))
return 0
diff --git a/scripts/fio_tests_configs/io_task_test.cfg b/scripts/fio_tests_configs/io_task_test.cfg
index 24d62a9..682fbd1 100644
--- a/scripts/fio_tests_configs/io_task_test.cfg
+++ b/scripts/fio_tests_configs/io_task_test.cfg
@@ -1,29 +1,23 @@
-[writetest * 7]
+[defaults]
group_reporting
-numjobs=4
wait_for_previous
ramp_time=15
-blocksize=4k
filename={FILENAME}
-rw=randwrite
-direct=1
buffered=0
iodepth=1
size=1000Mb
-runtime=30
time_based
-[readtest * 7]
-group_reporting
-numjobs=4
-wait_for_previous
-ramp_time=15
+[writetest * {ROUNDS}]
blocksize=4k
-filename={FILENAME}
+rw=randwrite
+direct=1
+runtime=30
+numjobs=1
+
+[readtest * {ROUNDS}]
+numjobs=4
+blocksize=4k
rw=randread
direct=1
-buffered=0
-iodepth=1
-size=1000Mb
runtime=30
-time_based
diff --git a/ssh_utils.py b/ssh_utils.py
index e546b72..c4a18e8 100644
--- a/ssh_utils.py
+++ b/ssh_utils.py
@@ -1,10 +1,12 @@
import re
import time
+import socket
import logging
import os.path
import getpass
+import threading
-import socket
+
import paramiko
@@ -215,6 +217,7 @@
if rrm is not None:
res.__dict__.update(rrm.groupdict())
return res
+
raise ValueError("Can't parse {0!r} as ssh uri value".format(uri))
@@ -224,42 +227,63 @@
return ssh_connect(creds)
-# def get_ssh_runner(uris,
-# conn_func,
-# latest_start_time=None,
-# keep_temp_files=False):
-# logger.debug("Connecting to servers")
+all_sessions_lock = threading.Lock()
+all_sessions = []
-# with ThreadPoolExecutor(max_workers=16) as executor:
-# connections = list(executor.map(connect, uris))
-# result_queue = Queue.Queue()
-# barrier = get_barrier(len(uris), threaded=True)
+def run_over_ssh(conn, cmd, stdin_data=None, timeout=60, nolog=False):
+ "should be replaces by normal implementation, with select"
+ transport = conn.get_transport()
+ session = transport.open_session()
-# def closure(obj):
-# ths = []
-# obj.set_result_cb(result_queue.put)
+ with all_sessions_lock:
+ all_sessions.append(session)
-# params = (obj, barrier, latest_start_time)
+ try:
+ session.set_combine_stderr(True)
-# logger.debug("Start tests")
-# for conn in connections:
-# th = threading.Thread(None, conn_func, None,
-# params + (conn,))
-# th.daemon = True
-# th.start()
-# ths.append(th)
+ stime = time.time()
-# for th in ths:
-# th.join()
+ if not nolog:
+ logger.debug("SSH: Exec {1!r}".format(conn, cmd))
-# test_result = []
-# while not result_queue.empty():
-# test_result.append(result_queue.get())
+ session.exec_command(cmd)
-# logger.debug("Done. Closing connection")
-# for conn in connections:
-# conn.close()
+ if stdin_data is not None:
+ session.sendall(stdin_data)
-# return test_result
-# return closure
+ session.settimeout(1)
+ session.shutdown_write()
+ output = ""
+
+ while True:
+ try:
+ ndata = session.recv(1024)
+ output += ndata
+ if "" == ndata:
+ break
+ except socket.timeout:
+ pass
+
+ if time.time() - stime > timeout:
+ raise OSError(output + "\nExecution timeout")
+
+ code = session.recv_exit_status()
+ finally:
+ session.close()
+
+ if code != 0:
+ templ = "Cmd {0!r} failed with code {1}. Output: {2}"
+ raise OSError(templ.format(cmd, code, output))
+
+ return output
+
+
+def close_all_sessions():
+ with all_sessions_lock:
+ for session in all_sessions:
+ try:
+ session.sendall('\x03')
+ session.close()
+ except:
+ pass
diff --git a/start_vms.py b/start_vms.py
index db25665..b872d32 100644
--- a/start_vms.py
+++ b/start_vms.py
@@ -5,6 +5,7 @@
from concurrent.futures import ThreadPoolExecutor
+# from novaclient.exceptions import NotFound
from novaclient.client import Client as n_client
from cinderclient.v1.client import Client as c_client
@@ -43,6 +44,31 @@
NOVA_CONNECTION = None
+# def get_or_create_aa_group(nova, name):
+# try:
+# group = conn.server_groups.find(name=name)
+# except NotFound:
+# group = None
+
+# if group is None:
+# conn.server_groups.create
+
+
+def allow_ssh(nova):
+ secgroup = nova.security_groups.find(name="default")
+ nova.security_group_rules.create(secgroup.id,
+ ip_protocol="tcp",
+ from_port="22",
+ to_port="22",
+ cidr="0.0.0.0/0")
+
+ nova.security_group_rules.create(secgroup.id,
+ ip_protocol="icmp",
+ from_port=-1,
+ cidr="0.0.0.0/0",
+ to_port=-1)
+
+
def create_keypair(nova, name, key_path):
with open(key_path) as key:
return nova.keypairs.create(name, key.read())
@@ -272,35 +298,3 @@
# exec_on_host("sudo /bin/mkdir /media/ceph")
# exec_on_host("sudo /bin/mount /dev/vdb /media/ceph")
# exec_on_host("sudo /bin/chmod a+rwx /media/ceph")
-
-
-# def main():
-# image_name = 'TestVM'
-# flavor_name = 'ceph'
-# vol_sz = 50
-# network_zone_name = 'net04'
-# amount = 10
-# keypair_name = 'ceph-test'
-
-# nova = nova_connect()
-# clear_all(nova)
-
-# try:
-# ips = []
-# params = dict(vol_sz=vol_sz)
-# params['image_name'] = image_name
-# params['flavor_name'] = flavor_name
-# params['network_zone_name'] = network_zone_name
-# params['amount'] = amount
-# params['keypair_name'] = keypair_name
-
-# for ip, host in create_vms(nova, **params):
-# ips.append(ip)
-
-# print "All setup done! Ips =", " ".join(ips)
-# print "Starting tests"
-# finally:
-# clear_all(nova)
-
-# if __name__ == "__main__":
-# exit(main())
diff --git a/tests/disk_test_agent.py b/tests/disk_test_agent.py
index 5de3038..b129175 100644
--- a/tests/disk_test_agent.py
+++ b/tests/disk_test_agent.py
@@ -76,8 +76,15 @@
if processed_vals.get('numjobs', '1') != '1':
assert 'group_reporting' in processed_vals, group_report_err_msg
+ ramp_time = processed_vals.get('ramp_time')
for i in range(repeat):
yield name.format(**params), processed_vals.copy()
+
+ if 'ramp_time' in processed_vals:
+ del processed_vals['ramp_time']
+
+ if ramp_time is not None:
+ processed_vals['ramp_time'] = ramp_time
else:
for it_vals in itertools.product(*iterable_values):
processed_vals.update(dict(zip(iterable_names, it_vals)))
@@ -377,6 +384,20 @@
yield bconf
+def get_test_sync_mode(jconfig):
+ is_sync = jconfig.get("sync", "0") == "1"
+ is_direct = jconfig.get("direct_io", "0") == "1"
+
+ if is_sync and is_direct:
+ return 'sd'
+ elif is_sync:
+ return 's'
+ elif is_direct:
+ return 'd'
+ else:
+ return 'a'
+
+
def add_job_results(jname, job_output, jconfig, res):
if job_output['write']['iops'] != 0:
raw_result = job_output['write']
@@ -386,8 +407,7 @@
if jname not in res:
j_res = {}
j_res["action"] = jconfig["rw"]
- j_res["direct_io"] = jconfig.get("direct", "0") == "1"
- j_res["sync"] = jconfig.get("sync", "0") == "1"
+ j_res["sync_mode"] = get_test_sync_mode(jconfig)
j_res["concurence"] = int(jconfig.get("numjobs", 1))
j_res["blocksize"] = jconfig["blocksize"]
j_res["jobname"] = job_output["jobname"]
@@ -396,22 +416,21 @@
else:
j_res = res[jname]
assert j_res["action"] == jconfig["rw"]
-
- assert j_res["direct_io"] == \
- (jconfig.get("direct", "0") == "1")
-
- assert j_res["sync"] == (jconfig.get("sync", "0") == "1")
+ assert j_res["sync_mode"] == get_test_sync_mode(jconfig)
assert j_res["concurence"] == int(jconfig.get("numjobs", 1))
assert j_res["blocksize"] == jconfig["blocksize"]
assert j_res["jobname"] == job_output["jobname"]
- assert j_res["timings"] == (jconfig.get("runtime"),
- jconfig.get("ramp_time"))
+
+ # ramp part is skipped for all tests, except first
+ # assert j_res["timings"] == (jconfig.get("runtime"),
+ # jconfig.get("ramp_time"))
def j_app(name, x):
j_res.setdefault(name, []).append(x)
# 'bw_dev bw_mean bw_max bw_min'.split()
- j_app("bw_mean", raw_result["bw_mean"])
+ # probably fix fio bug - iops is scaled to joncount, but bw - isn't
+ j_app("bw_mean", raw_result["bw_mean"] * j_res["concurence"])
j_app("iops", raw_result["iops"])
j_app("lat", raw_result["lat"]["mean"])
j_app("clat", raw_result["clat"]["mean"])
@@ -457,7 +476,7 @@
add_job_results(jname, job_output, jconfig, res)
except (SystemExit, KeyboardInterrupt):
- pass
+ raise
except Exception:
traceback.print_exc()
@@ -471,6 +490,37 @@
raise ValueError("Unknown behcnmark {0}".format(binary_tp))
+def read_config(fd, timeout=10):
+ job_cfg = ""
+ etime = time.time() + timeout
+ while True:
+ wtime = etime - time.time()
+ if wtime <= 0:
+ raise IOError("No config provided")
+
+ r, w, x = select.select([fd], [], [], wtime)
+ if len(r) == 0:
+ raise IOError("No config provided")
+
+ char = fd.read(1)
+ if '' == char:
+ return job_cfg
+
+ job_cfg += char
+
+
+def estimate_cfg(job_cfg, params):
+ bconf = list(parse_fio_config_full(job_cfg, params))
+ return calculate_execution_time(bconf)
+
+
+def sec_to_str(seconds):
+ h = seconds // 3600
+ m = (seconds % 3600) // 60
+ s = seconds % 60
+ return "{0}:{1:02d}:{2:02d}".format(h, m, s)
+
+
def parse_args(argv):
parser = argparse.ArgumentParser(
description="Run fio' and return result")
@@ -506,25 +556,6 @@
return parser.parse_args(argv)
-def read_config(fd, timeout=10):
- job_cfg = ""
- etime = time.time() + timeout
- while True:
- wtime = etime - time.time()
- if wtime <= 0:
- raise IOError("No config provided")
-
- r, w, x = select.select([fd], [], [], wtime)
- if len(r) == 0:
- raise IOError("No config provided")
-
- char = fd.read(1)
- if '' == char:
- return job_cfg
-
- job_cfg += char
-
-
def main(argv):
argv_obj = parse_args(argv)
@@ -544,7 +575,11 @@
name, val = param_val.split("=", 1)
params[name] = val
- if argv_obj.num_tests or argv_obj.compile or argv_obj.estimate:
+ if argv_obj.estimate:
+ print sec_to_str(estimate_cfg(job_cfg, params))
+ return 0
+
+ if argv_obj.num_tests or argv_obj.compile:
bconf = list(parse_fio_config_full(job_cfg, params))
bconf = bconf[argv_obj.skip_tests:]
@@ -555,14 +590,6 @@
if argv_obj.num_tests:
print len(bconf)
- if argv_obj.estimate:
- seconds = calculate_execution_time(bconf)
-
- h = seconds // 3600
- m = (seconds % 3600) // 60
- s = seconds % 60
-
- print "{0}:{1}:{2}".format(h, m, s)
return 0
if argv_obj.start_at is not None:
@@ -589,11 +616,11 @@
argv_obj.faked_fio)
etime = time.time()
- res = {'__meta__': {'raw_cfg': job_cfg}, 'res': job_res}
+ res = {'__meta__': {'raw_cfg': job_cfg, 'params': params}, 'res': job_res}
oformat = 'json' if argv_obj.json else 'eval'
- out_fd.write("\nRun {} tests in {} seconds\n".format(num_tests,
- int(etime - stime)))
+ out_fd.write("\nRun {0} tests in {1} seconds\n".format(num_tests,
+ int(etime - stime)))
out_fd.write("========= RESULTS(format={0}) =========\n".format(oformat))
if argv_obj.json:
out_fd.write(json.dumps(res))
diff --git a/tests/io_scenario_check_th_count.cfg b/tests/io_scenario_check_th_count.cfg
index 478439e..3d57154 100644
--- a/tests/io_scenario_check_th_count.cfg
+++ b/tests/io_scenario_check_th_count.cfg
@@ -9,19 +9,38 @@
time_based
runtime=30
group_reporting
+numjobs={% 1, 2, 5, 10, 15, 20, 25, 30, 35, 40 %}
# ---------------------------------------------------------------------
# check different thread count. (latency, bw) = func(th_count)
+#
+# RANDOM R IOPS, DIRECT, should act same as AS (4k + randread + sync)
+# just faster. Not sure, that we need it
+# 4k + randread + direct
+#
+# RANDOM R/W IOPS
+# 4k + randread + sync
+# 4k + randwrite + sync
+#
+# LINEAR BW
+# 1m + write + direct
+# 1m + read + direct
+#
# ---------------------------------------------------------------------
[concurrence_test_{TEST_SUMM} * {NUM_ROUNDS}]
-blocksize={% 4k, 1m %}
-rw={% randwrite, randread %}
+blocksize=4k
+rw={% randread %}
direct=1
-numjobs={% 1, 2, 5, 10, 15, 20, 25, 30, 35, 40 %}
+sync=0
[concurrence_test_{TEST_SUMM} * {NUM_ROUNDS}]
-blocksize={% 4k, 1m %}
+blocksize=4k
rw=randwrite
direct=0
sync=1
-numjobs={% 1, 2, 5, 10, 15, 20, 25, 30, 35, 40 %}
+
+[concurrence_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=1m
+rw={% write, read %}
+direct=1
+sync=0
diff --git a/tests/io_scenario_check_vm_count_ec2.cfg b/tests/io_scenario_check_vm_count_ec2.cfg
new file mode 100644
index 0000000..19c9e50
--- /dev/null
+++ b/tests/io_scenario_check_vm_count_ec2.cfg
@@ -0,0 +1,29 @@
+[defaults]
+NUM_ROUNDS=7
+ramp_time=5
+buffered=0
+wait_for_previous
+filename={FILENAME}
+iodepth=1
+size=10Gb
+time_based
+runtime=30
+group_reporting
+rate={BW_LIMIT}
+rate_iops={IOPS_LIMIT}
+
+# ---------------------------------------------------------------------
+# check different thread count. (latency, bw) = func(th_count)
+# ---------------------------------------------------------------------
+[vm_count_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize={% 4k, 1m %}
+rw={% randwrite, randread %}
+direct=1
+numjobs={% 1, 2, 5, 10, 15, 20, 25, 30, 35, 40 %}
+
+[vm_count_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize={% 4k, 1m %}
+rw=randwrite
+direct=0
+sync=1
+numjobs={% 1, 2, 5, 10, 15, 20, 25, 30, 35, 40 %}
diff --git a/tests/io_scenario_hdd.cfg b/tests/io_scenario_hdd.cfg
index 3238503..0c36324 100644
--- a/tests/io_scenario_hdd.cfg
+++ b/tests/io_scenario_hdd.cfg
@@ -6,14 +6,14 @@
iodepth=1
filename={FILENAME}
-NUM_ROUNDS={NUM_ROUNDS}
+NUM_ROUNDS=7
ramp_time=5
size=10Gb
runtime=30
# ---------------------------------------------------------------------
-# check different thread count, sync mode. (latency, bw) = func(th_count)
+# check different thread count, sync mode. (latency, iops) = func(th_count)
# ---------------------------------------------------------------------
[hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
blocksize={% 4k %}
@@ -22,7 +22,8 @@
numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
# ---------------------------------------------------------------------
-# check different thread count, direct read mode. (latency, bw) = func(th_count)
+# check different thread count, direct read mode. (latency, iops) = func(th_count)
+# also check iops for randread
# ---------------------------------------------------------------------
[hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
blocksize={% 4k %}
@@ -31,17 +32,20 @@
numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
# ---------------------------------------------------------------------
-# check IOPS read/write. (latency, bw) = func(th_count)
+# check different thread count, direct read/write mode. (bw, iops) = func(th_count)
+# also check BW for seq read/write.
+# ---------------------------------------------------------------------
+[hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize={% 1m %}
+rw={% read, write %}
+direct=1
+numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
+
+# ---------------------------------------------------------------------
+# check IOPS randwrite.
# ---------------------------------------------------------------------
[hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
blocksize=4k
-rw={% randwrite, randread %}
+rw=randwrite
direct=1
-# ---------------------------------------------------------------------
-# check BW for seq read/write. (latency, bw) = func(th_count)
-# ---------------------------------------------------------------------
-[hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
-blocksize=1m
-rw={% write, read %}
-direct=1
diff --git a/tests/io_scenario_long_test.cfg b/tests/io_scenario_long_test.cfg
new file mode 100644
index 0000000..b1a40d9
--- /dev/null
+++ b/tests/io_scenario_long_test.cfg
@@ -0,0 +1,20 @@
+[defaults]
+# 24h test
+NUM_ROUNDS=288
+
+buffered=0
+wait_for_previous
+filename={FILENAME}
+iodepth=1
+size=50Gb
+time_based
+runtime=300
+
+# ---------------------------------------------------------------------
+# check read and write linearity. oper_time = func(size)
+# ---------------------------------------------------------------------
+[24h_test * {NUM_ROUNDS}]
+blocksize=128k
+rw=randwrite
+direct=1
+
diff --git a/tests/itest.py b/tests/itest.py
index a048703..3b71d3f 100644
--- a/tests/itest.py
+++ b/tests/itest.py
@@ -5,9 +5,11 @@
from disk_perf_test_tool.tests import disk_test_agent
from disk_perf_test_tool.tests.disk_test_agent import parse_fio_config_full
+from disk_perf_test_tool.tests.disk_test_agent import estimate_cfg, sec_to_str
from disk_perf_test_tool.tests.io_results_loader import parse_output
-from disk_perf_test_tool.ssh_utils import copy_paths
-from disk_perf_test_tool.utils import run_over_ssh, ssize_to_b
+from disk_perf_test_tool.ssh_utils import copy_paths, run_over_ssh
+from disk_perf_test_tool.utils import ssize_to_b
+
logger = logging.getLogger("io-perf-tool")
@@ -96,33 +98,45 @@
self.config_params = test_options.get('params', {})
self.tool = test_options.get('tool', 'fio')
self.raw_cfg = open(self.config_fname).read()
- self.configs = parse_fio_config_full(self.raw_cfg, self.config_params)
+ self.configs = list(parse_fio_config_full(self.raw_cfg,
+ self.config_params))
def pre_run(self, conn):
- # TODO: install fio, if not installed
- cmd = "sudo apt-get -y install fio"
+ try:
+ run_over_ssh(conn, 'which fio')
+ except OSError:
+ # TODO: install fio, if not installed
+ cmd = "sudo apt-get -y install fio"
- for i in range(3):
- try:
- run_over_ssh(conn, cmd)
- break
- except OSError:
- time.sleep(3)
+ for i in range(3):
+ try:
+ run_over_ssh(conn, cmd)
+ break
+ except OSError as err:
+ time.sleep(3)
+ else:
+ raise OSError("Can't install fio - " + err.message)
local_fname = disk_test_agent.__file__.rsplit('.')[0] + ".py"
self.files_to_copy = {local_fname: self.io_py_remote}
copy_paths(conn, self.files_to_copy)
cmd_templ = "dd if=/dev/zero of={0} bs={1} count={2}"
+ files = {}
+
for secname, params in self.configs:
sz = ssize_to_b(params['size'])
msz = msz = sz / (1024 ** 2)
if sz % (1024 ** 2) != 0:
msz += 1
- cmd = cmd_templ.format(params['filename'], 1024 ** 2, msz)
- run_over_ssh(conn, cmd)
+ fname = params['filename']
+ files[fname] = max(files.get(fname, 0), msz)
+
+ for fname, sz in files.items():
+ cmd = cmd_templ.format(fname, 1024 ** 2, msz)
+ run_over_ssh(conn, cmd, timeout=msz)
def run(self, conn, barrier):
cmd_templ = "env python2 {0} --type {1} {2} --json -"
@@ -135,14 +149,22 @@
cmd = cmd_templ.format(self.io_py_remote, self.tool, params)
logger.debug("Waiting on barrier")
+
+ exec_time = estimate_cfg(self.raw_cfg, self.config_params)
+ exec_time_str = sec_to_str(exec_time)
+
try:
- barrier.wait()
- logger.debug("Run {0}".format(cmd))
- out_err = run_over_ssh(conn, cmd, stdin_data=self.raw_cfg)
- self.on_result(out_err, cmd)
+ if barrier.wait():
+ logger.info("Test will takes about {0}".format(exec_time_str))
+
+ out_err = run_over_ssh(conn, cmd,
+ stdin_data=self.raw_cfg,
+ timeout=int(exec_time * 1.1))
finally:
barrier.exit()
+ self.on_result(out_err, cmd)
+
def on_result(self, out_err, cmd):
try:
for data in parse_output(out_err):
diff --git a/utils.py b/utils.py
index 11b38ae..5a9f188 100644
--- a/utils.py
+++ b/utils.py
@@ -1,8 +1,8 @@
-import time
-import socket
+import re
import logging
import threading
import contextlib
+import subprocess
logger = logging.getLogger("io-perf-tool")
@@ -40,8 +40,10 @@
if self.curr_count == self.count:
self.curr_count = 0
self.cond.notify_all()
+ return True
else:
self.cond.wait(timeout=timeout)
+ return False
def exit(self):
with self.cond:
@@ -64,46 +66,6 @@
raise
-def run_over_ssh(conn, cmd, stdin_data=None, exec_timeout=5 * 60 * 60):
- "should be replaces by normal implementation, with select"
- transport = conn.get_transport()
- session = transport.open_session()
- try:
- session.set_combine_stderr(True)
-
- stime = time.time()
- session.exec_command(cmd)
-
- if stdin_data is not None:
- session.sendall(stdin_data)
-
- session.settimeout(1)
- session.shutdown_write()
- output = ""
-
- while True:
- try:
- ndata = session.recv(1024)
- output += ndata
- if "" == ndata:
- break
- except socket.timeout:
- pass
-
- if time.time() - stime > exec_timeout:
- raise OSError(output + "\nExecution timeout")
-
- code = session.recv_exit_status()
- finally:
- session.close()
-
- if code != 0:
- templ = "Cmd {0!r} failed with code {1}. Output: {2}"
- raise OSError(templ.format(cmd, code, output))
-
- return output
-
-
SMAP = dict(k=1024, m=1024 ** 2, g=1024 ** 3, t=1024 ** 4)
@@ -118,3 +80,20 @@
return int(ssize)
except (ValueError, TypeError, AttributeError):
raise ValueError("Unknow size format {0!r}".format(ssize))
+
+
+def get_ip_for_target(target_ip):
+ cmd = 'ip route get to'.split(" ") + [target_ip]
+ data = subprocess.Popen(cmd, stdout=subprocess.PIPE).stdout.read()
+
+ rr = r'{0} via [.0-9]+ dev (?P<dev>.*?) src (?P<ip>[.0-9]+)$'
+ rr = rr.replace(" ", r'\s+')
+ rr = rr.format(target_ip.replace('.', r'\.'))
+
+ data_line = data.split("\n")[0].strip()
+ res = re.match(rr, data_line)
+
+ if res is None:
+ raise OSError("Can't define interface for {0}".format(target_ip))
+
+ return res.group('ip')