large commit. refactoring, fio code totally reworker, huge improvenent in test time and results, etc
diff --git a/TODO b/TODO
index 9de2aaa..8b13789 100644
--- a/TODO
+++ b/TODO
@@ -1,59 +1 @@
-go over each file and make sure, that we need it
-fix pgbench
-add ability to store all data into local file (bson/msgpack?)
-update report generation code
-allow to run package directly
-make a normal python package
-add options for username, build description, etc
-provide API for retreiving data
-fuel port forwarding and censor data collecting
-store all taken actions in special file and add 'clear' command to clear results
-or make a separated watchdog process, which would clear results
-
-INTEGRATE WITH BLKTRACE/SEEKWATCHER/perf
-
-!!!!add warm-up stage to each fio test!!!!!! and make io.py to ignore results from it
-
-create first release
-
-WE NEED MORE HIGHT_LEVEL TESTS
-IMPLEMENT TEMPLATE FROM
-
-Gchart looks really like 2000. Please investigate more
-modern-looking tools, like http://pygal.org/, http://www.flotcharts.org/
-and hundreds of thousands of javascript-python-c++ tools -
-https://wiki.python.org/moin/NumericAndScientific/Plotting,
-http://www.rgraph.net/examples/bar-line-and-pie-charts.html, ...
-http://www.sitepoint.com/15-best-javascript-charting-libraries/
-
-
-make nodes.discover.discover pluggable
-
-fix grafana + influx installation
-
-allow to owerwrite config options from command=line
-
-ceph sensors support
-fio package deployment on test nodes
-send data to influx
-integrate with grafana
-integrated with db
-collected lab data during test (check and fix this code)
-
-need to store complete hardware information (e.g. like in
-certification, but even more, but with controllable levels).
-It also should dump network topology.
-
-make a separated library for gathering hw info for node
-
-create a wiki page with tool description
-create a wiki page with usage descriptions
-create a wiki page with test selection description
- and where all calculations explained
-create a wiki page with test selection evaluation and provement
-create a separated repos for config
-
-
-Test methodology:
- https://software.intel.com/en-us/blogs/2013/10/25/measure-ceph-rbd-performance-in-a-quantitative-way-part-i
diff --git a/config.py b/config.py
index a33f9b5..6a162b8 100644
--- a/config.py
+++ b/config.py
@@ -20,17 +20,17 @@
# parser.add_argument("-c", "--chartpath")
# config = parser.parse_args(sys.argv[1:])
-path = "config.yaml"
+path = "koder.yaml"
# if config.path is not None:
# path = config.path
-cfg_dict = parse_config(os.path.join(os.path.dirname(__file__), path))
-basedir = cfg_dict['paths']['basedir']
-TEST_PATH = cfg_dict['paths']['TEST_PATH']
-SQLALCHEMY_MIGRATE_REPO = cfg_dict['paths']['SQLALCHEMY_MIGRATE_REPO']
-DATABASE_URI = cfg_dict['paths']['DATABASE_URI']
-CHARTS_IMG_PATH = cfg_dict['paths']['CHARTS_IMG_PATH']
+cfg_dict = parse_config(path)
+# basedir = cfg_dict['paths']['basedir']
+# TEST_PATH = cfg_dict['paths']['TEST_PATH']
+# SQLALCHEMY_MIGRATE_REPO = cfg_dict['paths']['SQLALCHEMY_MIGRATE_REPO']
+# DATABASE_URI = cfg_dict['paths']['DATABASE_URI']
+# CHARTS_IMG_PATH = cfg_dict['paths']['CHARTS_IMG_PATH']
# if config.basedir is not None:
# basedir = config.basedir
diff --git a/config.yaml b/config.yaml
index b365042..da94a8a 100644
--- a/config.yaml
+++ b/config.yaml
@@ -1,70 +1,83 @@
-# nodes to be started/detected
-discover:
- # openstack:
- # connection:
- # auth_url: http://172.16.54.130:5000/v2.0
- # creds: admin:admin@admin
- # discover:
- # vm:
- # name: test1
- # auth: cirros:cubswin:)
- # nodes:
- # service: all
- # auth: cirros:cubswin:)
-
- # ceph: root@172.16.53.2::/home/koder/.ssh/id_rsa.pub
+clouds:
ceph: local
- # fuel:
- # connection:
- # url: http://172.16.52.120:8000/
- # creds: admin:admin@admin
- # discover: controller
+discover: ceph
explicit_nodes:
"ssh://192.168.152.43": testnode
-# start:
-# count: x1
-# img_name: TestVM
-# flavor_name: m1.micro
-# keypair_name: test
-# network_zone_name: net04
-# flt_ip_pool: net04_ext
-# key_file: path/to/
-# user: username
-
-
-# sensors to be installed, accordingli to role
sensors:
- receiver_uri: udp://192.168.0.108:5699
+ receiver_uri: udp://192.168.152.1:5699
roles_mapping:
- ceph-osd: block-io #ceph-io, ceph-cpu, ceph-ram, ceph-net
- # ceph-mon: ceph-io, ceph-cpu, ceph-ram, ceph-net
- os-compute: io, net
- test-vm: system-cpu
+ ceph-osd: block-io
+ testnode: system-cpu, block-io
-
-# tests to run
tests:
- # pgbench:
- # opts:
- # num_clients: [4, 8, 12]
- # transactions: [1, 2, 3]
- io:
- tool: fio
- config_file: io_task.cfg
-
-# where to store results
-results:
- - mos-linux-http://172.12.33.45
-
-paths:
- basedir: "/home/gstepanov/rally-results-processor"
- TEST_PATH: "/home/gstepanov/rally-results-processor/test_results"
- CHARTS_IMG_PATH: "static/images"
- SQLALCHEMY_MIGRATE_REPO: "/home/gstepanov/rally-results-processor/db_repository"
- DATABASE_URI: 'sqlite:////home/gstepanov/rally-results-processor/app.db?check_same_thread=False'
+ - io: tests/io_task_test.cfg
logging:
- extra_logs: 1
\ No newline at end of file
+ extra_logs: 1
+
+
+# # nodes to be started/detected
+# clouds:
+# # openstack: file:///tmp/openrc
+# # openstack: user:passwd:tenant, http://......
+# # ceph: ssh://root@172.16.53.2::/home/koder/.ssh/id_rsa.pub
+# fuel:
+# url: http://172.16.52.112:8000
+# creds: admin:admin:admin
+# ssh_creds: root:test37
+# openstack_env: test
+# ceph: local
+
+# # discover: fuel+openstack, ceph
+
+# explicit_nodes:
+# "ssh://192.168.152.43": testnode
+
+# # sensors to be installed, accordingly to role
+# sensors:
+# receiver_uri: udp://192.168.152.1:5699
+# roles_mapping:
+# ceph-osd: block-io #ceph-io, ceph-cpu, ceph-ram, ceph-net
+# # ceph-mon: ceph-io, ceph-cpu, ceph-ram, ceph-net
+# os-compute: io, net
+# test-vm: system-cpu
+
+# # tests to run
+# tests: #$include(tests.yaml)
+# # - with_test_nodes:
+# # openstack:
+# # vm_params:
+# # count: x1
+# # img_name: disk_io_perf
+# # flavor_name: disk_io_perf.256
+# # keypair_name: disk_io_perf
+# # network_zone_name: novanetwork
+# # flt_ip_pool: nova
+# # creds: "ssh://ubuntu@{0}::disk_io_perf.pem"
+# # tests:
+# # - pgbench:
+# # opts:
+# # num_clients: [4, 8, 12]
+# # transactions: [1, 2, 3]
+# - io: tests/io_task_test.cfg
+
+# # - vm_count:
+# # max_lat_ms: 20
+# # min_bw_mbps: 60
+# # min_4k_direct_w_iops: 100
+# # min_4k_direct_r_iops: 100
+
+# # where to store results
+# # results:
+# # - mos-linux-http://172.12.33.45
+# # - bson, /tmp/myrun.bson
+
+# paths:
+# basedir: "/home/gstepanov/rally-results-processor"
+# TEST_PATH: "/home/gstepanov/rally-results-processor/test_results"
+# CHARTS_IMG_PATH: "static/images"
+# SQLALCHEMY_MIGRATE_REPO: "/home/gstepanov/rally-results-processor/db_repository"
+# DATABASE_URI: 'sqlite:////home/gstepanov/rally-results-processor/app.db?check_same_thread=False'
diff --git a/data_processing.py b/data_processing.py
index 387ea3f..b15c579 100644
--- a/data_processing.py
+++ b/data_processing.py
@@ -62,4 +62,4 @@
for k in m.results.keys():
m.results[k] = [mean(m.results[k]), stdev(m.results[k])]
- return m
\ No newline at end of file
+ return m
diff --git a/fuel_rest_api.py b/fuel_rest_api.py
index 12b53bb..4b02687 100644
--- a/fuel_rest_api.py
+++ b/fuel_rest_api.py
@@ -442,7 +442,7 @@
yield Cluster(conn, **cluster_desc)
-def get_cluster_id(name, conn):
+def get_cluster_id(conn, name):
"""Get cluster id by name"""
for cluster in get_all_clusters(conn):
if cluster.name == name:
@@ -451,6 +451,8 @@
logger.debug('cluster id is %s' % cluster.id)
return cluster.id
+ raise ValueError("Cluster {0} not found".format(name))
+
sections = {
'sahara': 'additional_components',
diff --git a/koder.yaml b/koder.yaml
new file mode 100644
index 0000000..9868670
--- /dev/null
+++ b/koder.yaml
@@ -0,0 +1,23 @@
+clouds:
+ ceph: local
+
+discover: ceph
+
+explicit_nodes:
+ "ssh://192.168.152.43": testnode
+
+sensors:
+ receiver_uri: udp://192.168.152.1:5699
+ roles_mapping:
+ ceph-osd: block-io
+ testnode: system-cpu, block-io
+
+tests:
+ - io:
+ cfg: tests/io_task_test.cfg
+ params:
+ SOME_OPT: 12
+
+logging:
+ extra_logs: 1
+
diff --git a/nodes/discover.py b/nodes/discover.py
index b95d306..62da457 100644
--- a/nodes/discover.py
+++ b/nodes/discover.py
@@ -1,4 +1,5 @@
import logging
+import urlparse
import ceph
import fuel
@@ -9,14 +10,11 @@
logger = logging.getLogger("io-perf-tool")
-def discover(cluster_conf):
- if not cluster_conf:
- logger.error("No nodes configured")
-
+def discover(discover, clusters_info):
nodes_to_run = []
- for cluster, cluster_info in cluster_conf.items():
+ for cluster in discover:
if cluster == "openstack":
-
+ cluster_info = clusters_info["openstack"]
conn = cluster_info['connection']
user, passwd, tenant = parse_creds(conn['creds'])
@@ -39,17 +37,31 @@
cluster_info)
nodes_to_run.extend(os_nodes)
- if cluster == "fuel":
- url = cluster_info['connection'].pop('url')
- creads = cluster_info['connection']
- roles = cluster_info['discover']
+ elif cluster == "fuel" or cluster == "fuel+openstack":
+ cluster_info = clusters_info['fuel']
+ url = cluster_info['url']
+ creds = cluster_info['creds']
+ ssh_creds = cluster_info['ssh_creds']
+ # if user:password format us used
+ if not ssh_creds.startswith("ssh://"):
+ ip_port = urlparse.urlparse(url).netloc
- if isinstance(roles, basestring):
- roles = [roles]
+ if ':' in ip_port:
+ ip = ip_port.split(":")[0]
+ else:
+ ip = ip_port
- nodes_to_run.extend(fuel.discover_fuel_nodes(url, creads, roles))
+ ssh_creds = "ssh://{0}@{1}".format(ssh_creds, ip)
- if cluster == "ceph":
+ env = cluster_info['openstack_env']
+
+ nodes_to_run.extend(fuel.discover_fuel_nodes(url, creds, env))
+
+ elif cluster == "ceph":
+ cluster_info = clusters_info["ceph"]
nodes_to_run.extend(ceph.discover_ceph_nodes(cluster_info))
+ else:
+ msg_templ = "Unknown cluster type in 'discover' parameter: {0!r}"
+ raise ValueError(msg_templ.format(cluster))
return nodes_to_run
diff --git a/nodes/fuel.py b/nodes/fuel.py
index 25476dc..9b1312e 100644
--- a/nodes/fuel.py
+++ b/nodes/fuel.py
@@ -1,18 +1,18 @@
import logging
-import node
+from node import Node
import fuel_rest_api
-from disk_perf_test_tool.utils import parse_creds
logger = logging.getLogger("io-perf-tool")
-def discover_fuel_nodes(root_url, credentials, roles):
+def discover_fuel_nodes(root_url, credentials, cluster_name):
"""Discover Fuel nodes"""
- user, passwd, tenant = parse_creds(credentials['creds'])
-
+ assert credentials.count(':') >= 2
+ user, passwd_tenant = credentials.split(":", 1)
+ passwd, tenant = passwd_tenant.rsplit(":", 1)
creds = dict(
username=user,
password=passwd,
@@ -21,8 +21,14 @@
connection = fuel_rest_api.KeystoneAuth(root_url, creds)
fi = fuel_rest_api.FuelInfo(connection)
+
+ clusters_id = fuel_rest_api.get_cluster_id(connection, cluster_name)
+
nodes = []
- for role in roles:
- nodes.extend(getattr(fi.nodes, role))
- logger.debug("Found %s fuel nodes" % len(fi.nodes))
- return [node.Node(n.ip, n.get_roles()) for n in nodes]
+
+ for node in fi.nodes:
+ if node.cluster == clusters_id:
+ nodes.append(node)
+ res = [Node(n.ip, n.get_roles()) for n in nodes]
+ logger.debug("Found %s fuel nodes for env %r" % (len(res), cluster_name))
+ return res
diff --git a/nodes/openstack.py b/nodes/openstack.py
index ab41f62..d212283 100644
--- a/nodes/openstack.py
+++ b/nodes/openstack.py
@@ -63,9 +63,9 @@
client = Client(version='1.1', **conn_details)
nodes = []
if conf.get('discover'):
- vms_to_discover = conf['discover'].get('vm')
- if vms_to_discover:
- nodes.extend(discover_vms(client, vms_to_discover))
+ # vms_to_discover = conf['discover'].get('vm')
+ # if vms_to_discover:
+ # nodes.extend(discover_vms(client, vms_to_discover))
services_to_discover = conf['discover'].get('nodes')
if services_to_discover:
nodes.extend(discover_services(client, services_to_discover))
diff --git a/results/untime_variation_no_wa.txt b/results/untime_variation_no_wa.txt
new file mode 100644
index 0000000..d985cfd
--- /dev/null
+++ b/results/untime_variation_no_wa.txt
@@ -0,0 +1,121 @@
+{'__meta__': {'raw_cfg': '[writetest_10 * 7]\nstartdelay=10\nnumjobs=1\nblocksize=4k\nfilename=/media/koder/a5230078-4c27-4c3b-99aa-26148e78b2e7/xxx.bin\nrw=randwrite\ndirect=1\nbuffered=0\niodepth=1\nsize=1Gb\nruntime=10\ntime_based\nwait_for_previous\n\n[writetest_20 * 7]\nstartdelay=10\nnumjobs=1\nblocksize=4k\nfilename=/media/koder/a5230078-4c27-4c3b-99aa-26148e78b2e7/xxx.bin\nrw=randwrite\ndirect=1\nbuffered=0\niodepth=1\nsize=1Gb\nruntime=20\ntime_based\nwait_for_previous\n\n[writetest_30 * 7]\nstartdelay=10\nnumjobs=1\nblocksize=4k\nfilename=/media/koder/a5230078-4c27-4c3b-99aa-26148e78b2e7/xxx.bin\nrw=randwrite\ndirect=1\nbuffered=0\niodepth=1\nsize=1Gb\nruntime=30\ntime_based\nwait_for_previous\n\n[writetest_120 * 7]\nstartdelay=10\nnumjobs=1\nblocksize=4k\nfilename=/media/koder/a5230078-4c27-4c3b-99aa-26148e78b2e7/xxx.bin\nrw=randwrite\ndirect=1\nbuffered=0\niodepth=1\nsize=1Gb\nruntime=120\ntime_based\nwait_for_previous\n'},
+ 'res': {u'writetest_10': {'action': 'randwrite',
+ 'bw_mean': [1042.83,
+ 940.67,
+ 930.06,
+ 894.17,
+ 891.28,
+ 915.72,
+ 902.28],
+ 'clat': [4009.81,
+ 4522.23,
+ 4529.23,
+ 4767.7,
+ 4764.96,
+ 4651.26,
+ 4716.1],
+ 'concurence': 1,
+ 'direct_io': True,
+ 'iops': [249, 220, 220, 209, 209, 214, 211],
+ 'jobname': u'writetest_10',
+ 'lat': [4010.17,
+ 4522.6,
+ 4529.58,
+ 4768.08,
+ 4765.32,
+ 4651.62,
+ 4716.46],
+ 'size': '1Gb',
+ 'slat': [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
+ 'sync': False,
+ 'timings': ('10', '10')},
+ u'writetest_120': {'action': 'randwrite',
+ 'bw_mean': [772.65,
+ 862.57,
+ 874.52,
+ 877.36,
+ 815.79,
+ 746.11,
+ 805.6],
+ 'clat': [5637.01,
+ 5054.7,
+ 4973.0,
+ 4989.0,
+ 5334.4,
+ 5826.8,
+ 5408.51],
+ 'concurence': 1,
+ 'direct_io': True,
+ 'iops': [177, 197, 200, 200, 187, 171, 184],
+ 'jobname': u'writetest_120',
+ 'lat': [5637.37,
+ 5055.07,
+ 4973.37,
+ 4989.36,
+ 5334.79,
+ 5827.16,
+ 5408.87],
+ 'size': '1Gb',
+ 'slat': [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
+ 'sync': False,
+ 'timings': ('120', '10')},
+ u'writetest_20': {'action': 'randwrite',
+ 'bw_mean': [908.22,
+ 898.28,
+ 888.81,
+ 904.74,
+ 895.08,
+ 894.42,
+ 890.42],
+ 'clat': [4738.29,
+ 4762.78,
+ 4826.89,
+ 4822.72,
+ 4847.32,
+ 4785.83,
+ 4849.56],
+ 'concurence': 1,
+ 'direct_io': True,
+ 'iops': [210, 209, 206, 207, 206, 208, 205],
+ 'jobname': u'writetest_20',
+ 'lat': [4738.68,
+ 4763.16,
+ 4827.26,
+ 4823.09,
+ 4847.67,
+ 4786.18,
+ 4849.95],
+ 'size': '1Gb',
+ 'slat': [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
+ 'sync': False,
+ 'timings': ('20', '10')},
+ u'writetest_30': {'action': 'randwrite',
+ 'bw_mean': [880.89,
+ 878.45,
+ 868.5,
+ 854.61,
+ 777.02,
+ 748.49,
+ 729.09],
+ 'clat': [4874.05,
+ 4927.75,
+ 4979.63,
+ 5080.85,
+ 5558.3,
+ 5753.93,
+ 5925.94],
+ 'concurence': 1,
+ 'direct_io': True,
+ 'iops': [204, 202, 200, 196, 179, 173, 168],
+ 'jobname': u'writetest_30',
+ 'lat': [4874.44,
+ 4928.12,
+ 4980.01,
+ 5081.22,
+ 5558.68,
+ 5754.29,
+ 5926.29],
+ 'size': '1Gb',
+ 'slat': [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
+ 'sync': False,
+ 'timings': ('30', '10')}}}
diff --git a/run_test.py b/run_test.py
index d1a9f4f..992c7eb 100755
--- a/run_test.py
+++ b/run_test.py
@@ -1,4 +1,5 @@
import sys
+import json
import Queue
import pprint
import logging
@@ -10,10 +11,11 @@
import utils
import ssh_utils
+import start_vms
from nodes import discover
from nodes.node import Node
from config import cfg_dict
-from itest import IOPerfTest, PgBenchTest
+from tests.itest import IOPerfTest, PgBenchTest
from sensors.api import start_monitoring
@@ -57,15 +59,19 @@
logger.info("Connecting to nodes")
with ThreadPoolExecutor(32) as pool:
list(pool.map(connect_one, nodes))
+ logger.info("All nodes connected successfully")
def save_sensors_data(q):
logger.info("Start receiving sensors data")
+ sensor_data = []
while True:
val = q.get()
if val is None:
+ print sensor_data
+ q.put(sensor_data)
break
- # logger.debug("Sensors -> {0!r}".format(val))
+ sensor_data.append(val)
logger.info("Sensors thread exits")
@@ -90,26 +96,30 @@
res_q = Queue.Queue()
- for name, params in config['tests'].items():
- logger.info("Starting {0} tests".format(name))
+ for test in config['tests']:
+ for name, params in test.items():
+ logger.info("Starting {0} tests".format(name))
- threads = []
- barrier = utils.Barrier(len(test_nodes))
- for node in test_nodes:
- msg = "Starting {0} test on {1} node"
- logger.debug(msg.format(name, node.conn_url))
- test = tool_type_mapper[name](params, res_q.put)
- th = threading.Thread(None, test_thread, None,
- (test, node, barrier))
- threads.append(th)
- th.daemon = True
- th.start()
+ threads = []
+ barrier = utils.Barrier(len(test_nodes))
+ for node in test_nodes:
+ msg = "Starting {0} test on {1} node"
+ logger.debug(msg.format(name, node.conn_url))
+ test = tool_type_mapper[name](params, res_q.put)
+ th = threading.Thread(None, test_thread, None,
+ (test, node, barrier))
+ threads.append(th)
+ th.daemon = True
+ th.start()
- for th in threads:
- th.join()
+ for th in threads:
+ th.join()
- while not res_q.empty():
- logger.info("Get test result {0!r}".format(res_q.get()))
+ results = []
+ while not res_q.empty():
+ results.append(res_q.get())
+ # logger.info("Get test result {0!r}".format(results[-1]))
+ yield name, results
def parse_args(argv):
@@ -120,14 +130,14 @@
action='store_true', default=False,
help="print some extra log info")
- parser.add_argument('stages', nargs="+",
- choices=["discover", "connect", "start_new_nodes",
- "deploy_sensors", "run_tests"])
+ parser.add_argument("-o", '--output-dest', nargs="*")
+ parser.add_argument("config_file", nargs="?", default="config.yaml")
return parser.parse_args(argv[1:])
-def log_nodes_statistic(nodes):
+def log_nodes_statistic(_, ctx):
+ nodes = ctx.nodes
logger.info("Found {0} nodes total".format(len(nodes)))
per_role = collections.defaultdict(lambda: 0)
for node in nodes:
@@ -142,65 +152,132 @@
pass
+def connect_stage(cfg, ctx):
+ ctx.clear_calls_stack.append(disconnect_stage)
+ connect_all(ctx.nodes)
+
+
+def discover_stage(cfg, ctx):
+ if 'discover' in cfg:
+ discover_objs = [i.strip() for i in cfg['discover'].strip().split(",")]
+ ctx.nodes.extend(discover.discover(discover_objs, cfg['clouds']))
+
+ for url, roles in cfg.get('explicit_nodes', {}).items():
+ ctx.nodes.append(Node(url, roles.split(",")))
+
+
+def deploy_sensors_stage(cfg_dict, ctx):
+ ctx.clear_calls_stack.append(remove_sensors_stage)
+ if 'sensors' not in cfg_dict:
+ return
+
+ cfg = cfg_dict.get('sensors')
+ sens_cfg = []
+
+ for role, sensors_str in cfg["roles_mapping"].items():
+ sensors = [sens.strip() for sens in sensors_str.split(",")]
+
+ collect_cfg = dict((sensor, {}) for sensor in sensors)
+
+ for node in ctx.nodes:
+ if role in node.roles:
+ sens_cfg.append((node.connection, collect_cfg))
+
+ log_sensors_config(sens_cfg)
+
+ ctx.sensor_cm = start_monitoring(cfg["receiver_uri"], None,
+ connected_config=sens_cfg)
+
+ ctx.sensors_control_queue = ctx.sensor_cm.__enter__()
+
+ th = threading.Thread(None, save_sensors_data, None,
+ (ctx.sensors_control_queue,))
+ th.daemon = True
+ th.start()
+ ctx.sensor_listen_thread = th
+
+
+def remove_sensors_stage(cfg, ctx):
+ ctx.sensors_control_queue.put(None)
+ ctx.sensor_listen_thread.join()
+ ctx.sensor_data = ctx.sensors_control_queue.get()
+
+
+def run_tests_stage(cfg, ctx):
+ ctx.results = []
+
+ if 'tests' in cfg:
+ ctx.results.extend(run_tests(cfg_dict, ctx.nodes))
+
+ # if 'start_test_nodes' in opts.stages:
+ # params = cfg_dict['start_test_nodes']['openstack']
+ # for new_node in start_vms.launch_vms(params):
+ # new_node.roles.append('testnode')
+ # nodes.append(new_node)
+
+
+def disconnect_stage(cfg, ctx):
+ for node in ctx.nodes:
+ if node.connection is not None:
+ node.connection.close()
+
+
+def report_stage(cfg, ctx):
+ output_dest = cfg.get('output_dest')
+ if output_dest is not None:
+ with open(output_dest, "w") as fd:
+ data = {"sensor_data": ctx.sensor_data,
+ "results": ctx.results}
+ fd.write(json.dumps(data))
+ else:
+ print "=" * 20 + " RESULTS " + "=" * 20
+ pprint.pprint(ctx.results)
+ print "=" * 60
+
+
+def complete_log_nodes_statistic(cfg, ctx):
+ nodes = ctx.nodes
+ for node in nodes:
+ logger.debug(str(node))
+
+
+class Context(object):
+ def __init__(self):
+ self.nodes = []
+ self.clear_calls_stack = []
+
+
def main(argv):
opts = parse_args(argv)
level = logging.DEBUG if opts.extra_logs else logging.WARNING
setup_logger(logger, level)
- nodes = []
+ stages = [
+ discover_stage,
+ connect_stage,
+ complete_log_nodes_statistic,
+ # deploy_sensors_stage,
+ run_tests_stage,
+ report_stage
+ ]
- if 'discover' in opts.stages:
- logger.info("Start node discovery")
- nodes = discover.discover(cfg_dict.get('discover'))
+ ctx = Context()
+ try:
+ for stage in stages:
+ logger.info("Start {0.__name__} stage".format(stage))
+ stage(cfg_dict, ctx)
+ finally:
+ exc, cls, tb = sys.exc_info()
+ for stage in ctx.clear_calls_stack[::-1]:
+ try:
+ logger.info("Start {0.__name__} stage".format(stage))
+ stage(cfg_dict, ctx)
+ except:
+ pass
- if 'explicit_nodes' in cfg_dict:
- for url, roles in cfg_dict['explicit_nodes'].items():
- nodes.append(Node(url, roles.split(",")))
-
- log_nodes_statistic(nodes)
-
- if 'connect' in opts.stages:
- connect_all(nodes)
-
- if 'deploy_sensors' in opts.stages:
- logger.info("Deploing sensors")
- cfg = cfg_dict.get('sensors')
- sens_cfg = []
-
- for role, sensors_str in cfg["roles_mapping"].items():
- sensors = [sens.strip() for sens in sensors_str.split(",")]
-
- collect_cfg = dict((sensor, {}) for sensor in sensors)
-
- for node in nodes:
- if role in node.roles:
- sens_cfg.append((node.connection, collect_cfg))
-
- log_sensors_config(sens_cfg)
-
- sensor_cm = start_monitoring(cfg["receiver_uri"], None,
- connected_config=sens_cfg)
-
- with sensor_cm as sensors_control_queue:
- th = threading.Thread(None, save_sensors_data, None,
- (sensors_control_queue,))
- th.daemon = True
- th.start()
-
- # TODO: wait till all nodes start to send sensors data
-
- if 'run_tests' in opts.stages:
- run_tests(cfg_dict, nodes)
-
- sensors_control_queue.put(None)
- th.join()
- elif 'run_tests' in opts.stages:
- run_tests(cfg_dict, nodes)
-
- logger.info("Disconnecting")
- for node in nodes:
- node.connection.close()
+ if exc is not None:
+ raise exc, cls, tb
return 0
diff --git a/scripts/agent.py b/scripts/agent.py
index 56189f9..2873cb5 100644
--- a/scripts/agent.py
+++ b/scripts/agent.py
@@ -22,10 +22,10 @@
out, err = p.communicate()
- if not out is None:
+ if out is not None:
print out
- if not err is None:
+ if err is not None:
print err
node_port[ip] = base_port
@@ -35,18 +35,12 @@
def parse_command_line(argv):
- parser = argparse.ArgumentParser(description=
- "Connect to fuel master "
+ parser = argparse.ArgumentParser(description="Connect to fuel master " +
"and setup ssh agent")
- parser.add_argument(
- "--base_port", type=int, required=True)
-
+ parser.add_argument("--base_port", type=int, required=True)
# To do: fix clean to be False when string is False
- parser.add_argument(
- "--clean", type=bool, default=False)
-
- parser.add_argument(
- "--ports", type=str, nargs='+')
+ parser.add_argument("--clean", type=bool, default=False)
+ parser.add_argument("--ports", type=str, nargs='+')
return parser.parse_args(argv)
diff --git a/scripts/connector.py b/scripts/connector.py
index 4d9d32f..15bc538 100644
--- a/scripts/connector.py
+++ b/scripts/connector.py
@@ -6,7 +6,7 @@
from urlparse import urlparse
-from keystone import KeystoneAuth
+from disk_perf_test_tool.keystone import KeystoneAuth
def discover_fuel_nodes(fuel_url, creds, cluster_id):
@@ -92,17 +92,11 @@
def parse_command_line(argv):
parser = argparse.ArgumentParser(
description="Connect to fuel master and setup ssh agent")
- parser.add_argument(
- "--fuel_url", required=True)
- parser.add_argument(
- "--cluster_id", required=True)
- parser.add_argument(
- "--username", default="admin")
- parser.add_argument(
- "--tenantname", default="admin")
- parser.add_argument(
- "--password", default="admin")
-
+ parser.add_argument("--fuel-url", required=True)
+ parser.add_argument("--cluster-id", required=True)
+ parser.add_argument("--username", default="admin")
+ parser.add_argument("--tenantname", default="admin")
+ parser.add_argument("--password", default="admin")
return parser.parse_args(argv)
@@ -113,6 +107,8 @@
"password": args.password}
nodes = discover_fuel_nodes(args.fuel_url, creds, args.cluster_id)
+ print "Ready", nodes
+ sys.stdin.readline()
discover_fuel_nodes_clean(args.fuel_url, {"username": "root",
"password": "test37",
"port": 22}, nodes)
diff --git a/scripts/data2.py b/scripts/data2.py
index 08dbc77..4a8dad9 100644
--- a/scripts/data2.py
+++ b/scripts/data2.py
@@ -1,6 +1,6 @@
import sys
-import math
-import itertools
+from data_stat import med_dev, round_deviation, groupby_globally
+from data_stat import read_data_agent_result
def key(x):
@@ -10,36 +10,6 @@
x['__meta__']['concurence'])
-def med_dev(vals):
- med = sum(vals) / len(vals)
- dev = ((sum(abs(med - i) ** 2 for i in vals) / len(vals)) ** 0.5)
- return int(med), int(dev)
-
-
-def round_deviation(med_dev):
- med, dev = med_dev
-
- if dev < 1E-7:
- return med_dev
-
- dev_div = 10.0 ** (math.floor(math.log10(dev)) - 1)
- dev = int(dev / dev_div) * dev_div
- med = int(med / dev_div) * dev_div
- return (type(med_dev[0])(med),
- type(med_dev[1])(dev))
-
-
-def groupby_globally(data, key_func):
- grouped = {}
- grouped_iter = itertools.groupby(data, key_func)
-
- for (bs, cache_tp, act, conc), curr_data_it in grouped_iter:
- key = (bs, cache_tp, act, conc)
- grouped.setdefault(key, []).extend(curr_data_it)
-
- return grouped
-
-
template = "{bs:>4} {action:>12} {cache_tp:>3} {conc:>4}"
template += " | {iops[0]:>6} ~ {iops[1]:>5} | {bw[0]:>7} ~ {bw[1]:>6}"
template += " | {lat[0]:>6} ~ {lat[1]:>5} |"
@@ -54,21 +24,7 @@
def main(argv):
- data = []
-
- with open(argv[1]) as fc:
- block = None
- for line in fc:
- if line.startswith("{'__meta__':"):
- block = line
- elif block is not None:
- block += line
-
- if block is not None:
- if block.count('}') == block.count('{'):
- data.append(eval(block))
- block = None
-
+ data = read_data_agent_result(sys.argv[1])
grouped = groupby_globally(data, key)
print template.format(**headers)
diff --git a/scripts/data_stat.py b/scripts/data_stat.py
new file mode 100644
index 0000000..a4d46bf
--- /dev/null
+++ b/scripts/data_stat.py
@@ -0,0 +1,49 @@
+import math
+import itertools
+
+
+def med_dev(vals):
+ med = sum(vals) / len(vals)
+ dev = ((sum(abs(med - i) ** 2 for i in vals) / len(vals)) ** 0.5)
+ return int(med), int(dev)
+
+
+def round_deviation(med_dev):
+ med, dev = med_dev
+
+ if dev < 1E-7:
+ return med_dev
+
+ dev_div = 10.0 ** (math.floor(math.log10(dev)) - 1)
+ dev = int(dev / dev_div) * dev_div
+ med = int(med / dev_div) * dev_div
+ return (type(med_dev[0])(med),
+ type(med_dev[1])(dev))
+
+
+def groupby_globally(data, key_func):
+ grouped = {}
+ grouped_iter = itertools.groupby(data, key_func)
+
+ for (bs, cache_tp, act, conc), curr_data_it in grouped_iter:
+ key = (bs, cache_tp, act, conc)
+ grouped.setdefault(key, []).extend(curr_data_it)
+
+ return grouped
+
+
+def read_data_agent_result(fname):
+ data = []
+ with open(fname) as fc:
+ block = None
+ for line in fc:
+ if line.startswith("{'__meta__':"):
+ block = line
+ elif block is not None:
+ block += line
+
+ if block is not None:
+ if block.count('}') == block.count('{'):
+ data.append(eval(block))
+ block = None
+ return data
diff --git a/scripts/disk_io_pp.py b/scripts/disk_io_pp.py
new file mode 100644
index 0000000..bf901d3
--- /dev/null
+++ b/scripts/disk_io_pp.py
@@ -0,0 +1,60 @@
+import sys
+import collections
+
+import scipy.stats as stats
+import matplotlib.mlab as mlab
+import matplotlib.pyplot as plt
+
+from data_stat import med_dev, round_deviation
+from data_stat import read_data_agent_result
+
+data = read_data_agent_result(sys.argv[1])
+
+# for run in data:
+# for name, numbers in run['res'].items():
+# # med, dev = round_deviation(med_dev(numbers['iops']))
+# # print name, med, '~', dev
+# distr = collections.defaultdict(lambda: 0.0)
+# for i in numbers['iops']:
+# distr[i] += 1
+
+# print name
+# for key, val in sorted(distr.items()):
+# print " ", key, val
+# print
+
+
+
+# # example data
+# mu = 100 # mean of distribution
+# sigma = 15 # standard deviation of distribution
+# x = mu + sigma * np.random.randn(10000)
+
+x = data[0]['res'][sys.argv[2]]['iops']
+# mu, sigma = med_dev(x)
+# print mu, sigma
+
+# med_sz = 1
+# x2 = x[:len(x) // med_sz * med_sz]
+# x2 = [sum(vals) / len(vals) for vals in zip(*[x2[i::med_sz]
+# for i in range(med_sz)])]
+
+mu, sigma = med_dev(x)
+print mu, sigma
+print stats.normaltest(x)
+
+num_bins = 20
+# the histogram of the data
+n, bins, patches = plt.hist(x, num_bins, normed=1, facecolor='green', alpha=0.5)
+# add a 'best fit' line
+
+y = mlab.normpdf(bins, mu, sigma)
+plt.plot(bins, y, 'r--')
+
+plt.xlabel('Smarts')
+plt.ylabel('Probability')
+plt.title(r'Histogram of IQ: $\mu={}$, $\sigma={}$'.format(mu, sigma))
+
+# Tweak spacing to prevent clipping of ylabel
+plt.subplots_adjust(left=0.15)
+plt.show()
diff --git a/sensors/deploy_sensors.py b/sensors/deploy_sensors.py
index e0428d9..3c715ef 100644
--- a/sensors/deploy_sensors.py
+++ b/sensors/deploy_sensors.py
@@ -88,6 +88,8 @@
if isinstance(uri_or_conn, basestring):
conn.close()
+ logger.debug("Sensors stopped and removed")
+
def stop_and_remove_sensors(config, remote_path='/tmp/sensors',
connected_config=None):
diff --git a/start_vms.py b/start_vms.py
index 16e7d08..81f3e81 100644
--- a/start_vms.py
+++ b/start_vms.py
@@ -8,6 +8,8 @@
from novaclient.client import Client as n_client
from cinderclient.v1.client import Client as c_client
+from nodes.node import Node
+from nodes.openstack import get_floating_ip
logger = logging.getLogger("io-perf-tool")
@@ -81,6 +83,28 @@
return [ip for ip in ip_list if ip.instance_id is None][:amount]
+def launch_vms(config):
+ creds = config['creds']
+ if creds != 'ENV':
+ raise ValueError("Only 'ENV' creds are supported")
+
+ logger.debug("Starting new nodes on openstack")
+ conn = nova_connect()
+ params = config['vm_params'].copy()
+ count = params.pop('count')
+
+ if isinstance(count, basestring):
+ assert count.startswith("x")
+ lst = conn.services.list(binary='nova-compute')
+ srv_count = len([srv for srv in lst if srv.status == 'enabled'])
+ count = srv_count * int(count[1:])
+
+ creds = params.pop('creds')
+
+ for ip, _ in create_vms_mt(conn, count, **params):
+ yield Node(creds.format(ip), [])
+
+
def create_vms_mt(nova, amount, keypair_name, img_name,
flavor_name, vol_sz=None, network_zone_name=None,
flt_ip_pool=None, name_templ='ceph-test-{0}',
@@ -99,10 +123,6 @@
if flt_ip_pool is not None:
ips_future = executor.submit(get_floating_ips,
nova, flt_ip_pool, amount)
- else:
- ips_future = None
-
- if ips_future is not None:
logger.debug("Wait for floating ip")
ips = ips_future.result()
ips += [Allocate] * (amount - len(ips))
@@ -169,11 +189,12 @@
if flt_ip is Allocate:
flt_ip = nova.floating_ips.create(pool)
+
if flt_ip is not None:
# print "attaching ip to server"
srv.add_floating_ip(flt_ip)
- return nova.servers.get(srv.id)
+ return flt_ip.ip, nova.servers.get(srv.id)
def clear_all(nova, name_templ="ceph-test-{0}"):
diff --git a/storage_api.py b/storage_api.py
index 1dbf30c..d02d0cd 100644
--- a/storage_api.py
+++ b/storage_api.py
@@ -1,5 +1,3 @@
-from urlparse import urlparse
-
import json
import math
from config import TEST_PATH
diff --git a/tests.yaml b/tests.yaml
new file mode 100644
index 0000000..a90c628
--- /dev/null
+++ b/tests.yaml
@@ -0,0 +1,26 @@
+- with_test_nodes:
+ openstack:
+ creds: ENV
+ # creds: FUEL://USER:PASSDW@172.16.52.112:8000/ENV_NAME
+ vm_params:
+ count: x1
+ img_name: disk_io_perf
+ flavor_name: disk_io_perf.256
+ keypair_name: disk_io_perf
+ network_zone_name: novanetwork
+ flt_ip_pool: nova
+ creds: "ssh://ubuntu@{0}::disk_io_perf.pem"
+ tests:
+ - pgbench:
+ opts:
+ num_clients: [4, 8, 12]
+ transactions: [1, 2, 3]
+ - io:
+ tool: fio
+ config_file: tests/io_task_test.cfg
+
+ - vm_count:
+ max_lat_ms: 20
+ min_bw_mbps: 60
+ min_4k_direct_w_iops: 100
+ min_4k_direct_r_iops: 100
diff --git a/tests/disk_test_agent.py b/tests/disk_test_agent.py
new file mode 100644
index 0000000..7537438
--- /dev/null
+++ b/tests/disk_test_agent.py
@@ -0,0 +1,457 @@
+import sys
+import time
+import json
+import select
+import pprint
+import argparse
+import traceback
+import subprocess
+import itertools
+from collections import OrderedDict
+
+
+SECTION = 0
+SETTING = 1
+
+
+def get_test_summary(params):
+ rw = {"randread": "rr",
+ "randwrite": "rw",
+ "read": "sr",
+ "write": "sw"}[params["rw"]]
+
+ if params.get("direct") == '1':
+ sync_mode = 'd'
+ elif params.get("sync") == '1':
+ sync_mode = 's'
+ else:
+ sync_mode = 'a'
+
+ th_count = int(params.get('numjobs', '1'))
+
+ return "{0}{1}{2}th{3}".format(rw, sync_mode,
+ params['blocksize'], th_count)
+
+
+counter = [0]
+
+
+def process_section(name, vals, defaults, format_params):
+ vals = vals.copy()
+ params = format_params.copy()
+
+ if '*' in name:
+ name, repeat = name.split('*')
+ name = name.strip()
+ repeat = int(repeat.format(**params))
+ else:
+ repeat = 1
+
+ # this code can be optimized
+ for i in range(repeat):
+ iterable_names = []
+ iterable_values = []
+ processed_vals = {}
+
+ for val_name, val in vals.items():
+ if val is None:
+ processed_vals[val_name] = val
+ # remove hardcode
+ elif val.startswith('{%'):
+ assert val.endswith("%}")
+ content = val[2:-2].format(**params)
+ iterable_names.append(val_name)
+ iterable_values.append(i.strip() for i in content.split(','))
+ else:
+ processed_vals[val_name] = val.format(**params)
+
+ if iterable_values == []:
+ params['UNIQ'] = 'UN{0}'.format(counter[0])
+ counter[0] += 1
+ params['TEST_SUMM'] = get_test_summary(processed_vals)
+ yield name.format(**params), processed_vals
+ else:
+ for it_vals in itertools.product(*iterable_values):
+ processed_vals.update(dict(zip(iterable_names, it_vals)))
+ params['UNIQ'] = 'UN{0}'.format(counter[0])
+ counter[0] += 1
+ params['TEST_SUMM'] = get_test_summary(processed_vals)
+ yield name.format(**params), processed_vals
+
+
+def calculate_execution_time(combinations):
+ time = 0
+ for _, params in combinations:
+ time += int(params.get('ramp_time', 0))
+ time += int(params.get('runtime', 0))
+ return time
+
+
+def parse_fio_config_full(fio_cfg, params=None):
+ defaults = {}
+ format_params = {}
+
+ if params is None:
+ ext_params = {}
+ else:
+ ext_params = params.copy()
+
+ curr_section = None
+ curr_section_name = None
+
+ for tp, name, val in parse_fio_config_iter(fio_cfg):
+ if tp == SECTION:
+ non_def = curr_section_name != 'defaults'
+ if curr_section_name is not None and non_def:
+ format_params.update(ext_params)
+ for sec in process_section(curr_section_name,
+ curr_section,
+ defaults,
+ format_params):
+ yield sec
+
+ if name == 'defaults':
+ curr_section = defaults
+ else:
+ curr_section = OrderedDict()
+ curr_section.update(defaults)
+ curr_section_name = name
+
+ else:
+ assert tp == SETTING
+ assert curr_section_name is not None, "no section name"
+ if name == name.upper():
+ assert curr_section_name == 'defaults'
+ format_params[name] = val
+ else:
+ curr_section[name] = val
+
+ if curr_section_name is not None and curr_section_name != 'defaults':
+ format_params.update(ext_params)
+ for sec in process_section(curr_section_name,
+ curr_section,
+ defaults,
+ format_params):
+ yield sec
+
+
+def parse_fio_config_iter(fio_cfg):
+ for lineno, line in enumerate(fio_cfg.split("\n")):
+ try:
+ line = line.strip()
+
+ if line.startswith("#") or line.startswith(";"):
+ continue
+
+ if line == "":
+ continue
+
+ if line.startswith('['):
+ assert line.endswith(']'), "name should ends with ]"
+ yield SECTION, line[1:-1], None
+ elif '=' in line:
+ opt_name, opt_val = line.split('=', 1)
+ yield SETTING, opt_name.strip(), opt_val.strip()
+ else:
+ yield SETTING, line, None
+ except Exception as exc:
+ pref = "During parsing line number {0}\n".format(lineno)
+ raise ValueError(pref + exc.message)
+
+
+def format_fio_config(fio_cfg):
+ res = ""
+ for pos, (name, section) in enumerate(fio_cfg):
+ if pos != 0:
+ res += "\n"
+
+ res += "[{0}]\n".format(name)
+ for opt_name, opt_val in section.items():
+ if opt_val is None:
+ res += opt_name + "\n"
+ else:
+ res += "{0}={1}\n".format(opt_name, opt_val)
+ return res
+
+
+def do_run_fio(bconf):
+ benchmark_config = format_fio_config(bconf)
+ cmd = ["fio", "--output-format=json", "-"]
+ p = subprocess.Popen(cmd, stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT)
+
+ # set timeout
+ raw_out, _ = p.communicate(benchmark_config)
+
+ try:
+ parsed_out = json.loads(raw_out)["jobs"]
+ except Exception:
+ msg = "Can't parse fio output: {0!r}\nError: {1}"
+ raise ValueError(msg.format(raw_out, traceback.format_exc()))
+
+ return zip(parsed_out, bconf)
+
+
+# limited by fio
+MAX_JOBS = 1000
+
+
+def next_test_portion(whole_conf, runcycle):
+ jcount = 0
+ runtime = 0
+ bconf = []
+
+ for pos, (name, sec) in enumerate(whole_conf):
+ jc = int(sec.get('numjobs', '1'))
+
+ if runcycle is not None:
+ curr_task_time = calculate_execution_time([(name, sec)])
+ else:
+ curr_task_time = 0
+
+ if jc > MAX_JOBS:
+ err_templ = "Can't process job {0!r} - too large numjobs"
+ raise ValueError(err_templ.format(name))
+
+ if runcycle is not None and len(bconf) != 0:
+ rc_ok = curr_task_time + runtime <= runcycle
+ else:
+ rc_ok = True
+
+ if jc + jcount <= MAX_JOBS and rc_ok:
+ runtime += curr_task_time
+ jcount += jc
+ bconf.append((name, sec))
+ continue
+
+ assert len(bconf) != 0
+ yield bconf
+
+ runtime = curr_task_time
+ jcount = jc
+ bconf = [(name, sec)]
+
+ if bconf != []:
+ yield bconf
+
+
+def add_job_results(jname, job_output, jconfig, res):
+ if job_output['write']['iops'] != 0:
+ raw_result = job_output['write']
+ else:
+ raw_result = job_output['read']
+
+ if jname not in res:
+ j_res = {}
+ j_res["action"] = jconfig["rw"]
+ j_res["direct_io"] = jconfig.get("direct", "0") == "1"
+ j_res["sync"] = jconfig.get("sync", "0") == "1"
+ j_res["concurence"] = int(jconfig.get("numjobs", 1))
+ j_res["size"] = jconfig["size"]
+ j_res["jobname"] = job_output["jobname"]
+ j_res["timings"] = (jconfig.get("runtime"),
+ jconfig.get("ramp_time"))
+ else:
+ j_res = res[jname]
+ assert j_res["action"] == jconfig["rw"]
+
+ assert j_res["direct_io"] == \
+ (jconfig.get("direct", "0") == "1")
+
+ assert j_res["sync"] == (jconfig.get("sync", "0") == "1")
+ assert j_res["concurence"] == int(jconfig.get("numjobs", 1))
+ assert j_res["size"] == jconfig["size"]
+ assert j_res["jobname"] == job_output["jobname"]
+ assert j_res["timings"] == (jconfig.get("runtime"),
+ jconfig.get("ramp_time"))
+
+ def j_app(name, x):
+ j_res.setdefault(name, []).append(x)
+
+ # 'bw_dev bw_mean bw_max bw_min'.split()
+ j_app("bw_mean", raw_result["bw_mean"])
+ j_app("iops", raw_result["iops"])
+ j_app("lat", raw_result["lat"]["mean"])
+ j_app("clat", raw_result["clat"]["mean"])
+ j_app("slat", raw_result["slat"]["mean"])
+
+ res[jname] = j_res
+
+
+def run_fio(benchmark_config,
+ params,
+ runcycle=None,
+ raw_results_func=None,
+ skip_tests=0):
+
+ whole_conf = list(parse_fio_config_full(benchmark_config, params))
+ whole_conf = whole_conf[skip_tests:]
+ res = {}
+ curr_test_num = skip_tests
+ execited_tests = 0
+ try:
+ for bconf in next_test_portion(whole_conf, runcycle):
+ res_cfg_it = do_run_fio(bconf)
+ res_cfg_it = enumerate(res_cfg_it, curr_test_num)
+
+ for curr_test_num, (job_output, (jname, jconfig)) in res_cfg_it:
+ execited_tests += 1
+ if raw_results_func is not None:
+ raw_results_func(curr_test_num,
+ [job_output, jname, jconfig])
+
+ assert jname == job_output["jobname"]
+
+ if jname.startswith('_'):
+ continue
+
+ add_job_results(jname, job_output, jconfig, res)
+
+ except (SystemExit, KeyboardInterrupt):
+ pass
+
+ except Exception:
+ traceback.print_exc()
+
+ return res, execited_tests
+
+
+def run_benchmark(binary_tp, *argv, **kwargs):
+ if 'fio' == binary_tp:
+ return run_fio(*argv, **kwargs)
+ raise ValueError("Unknown behcnmark {0}".format(binary_tp))
+
+
+def parse_args(argv):
+ parser = argparse.ArgumentParser(
+ description="Run fio' and return result")
+ parser.add_argument("--type", metavar="BINARY_TYPE",
+ choices=['fio'], default='fio',
+ help=argparse.SUPPRESS)
+ parser.add_argument("--start-at", metavar="START_AT_UTC", type=int,
+ help="Start execution at START_AT_UTC")
+ parser.add_argument("--json", action="store_true", default=False,
+ help="Json output format")
+ parser.add_argument("--output", default='-', metavar="FILE_PATH",
+ help="Store results to FILE_PATH")
+ parser.add_argument("--estimate", action="store_true", default=False,
+ help="Only estimate task execution time")
+ parser.add_argument("--compile", action="store_true", default=False,
+ help="Compile config file to fio config")
+ parser.add_argument("--num-tests", action="store_true", default=False,
+ help="Show total number of tests")
+ parser.add_argument("--runcycle", type=int, default=None,
+ metavar="MAX_CYCLE_SECONDS",
+ help="Max cycle length in seconds")
+ parser.add_argument("--show-raw-results", action='store_true',
+ default=False, help="Output raw input and results")
+ parser.add_argument("--skip-tests", type=int, default=0, metavar="NUM",
+ help="Skip NUM tests")
+ parser.add_argument("--params", nargs="*", metavar="PARAM=VAL",
+ default=[],
+ help="Provide set of pairs PARAM=VAL to" +
+ "format into job description")
+ parser.add_argument("jobfile")
+ return parser.parse_args(argv)
+
+
+def read_config(fd, timeout=10):
+ job_cfg = ""
+ etime = time.time() + timeout
+ while True:
+ wtime = etime - time.time()
+ if wtime <= 0:
+ raise IOError("No config provided")
+
+ r, w, x = select.select([fd], [], [], wtime)
+ if len(r) == 0:
+ raise IOError("No config provided")
+
+ char = fd.read(1)
+ if '' == char:
+ return job_cfg
+
+ job_cfg += char
+
+
+def main(argv):
+ argv_obj = parse_args(argv)
+
+ if argv_obj.jobfile == '-':
+ job_cfg = read_config(sys.stdin)
+ else:
+ job_cfg = open(argv_obj.jobfile).read()
+
+ if argv_obj.output == '-':
+ out_fd = sys.stdout
+ else:
+ out_fd = open(argv_obj.output, "w")
+
+ params = {}
+ for param_val in argv_obj.params:
+ assert '=' in param_val
+ name, val = param_val.split("=", 1)
+ params[name] = val
+
+ if argv_obj.num_tests or argv_obj.compile or argv_obj.estimate:
+ bconf = list(parse_fio_config_full(job_cfg, params))
+ bconf = bconf[argv_obj.skip_tests:]
+
+ if argv_obj.compile:
+ out_fd.write(format_fio_config(bconf))
+ out_fd.write("\n")
+
+ if argv_obj.num_tests:
+ print len(bconf)
+
+ if argv_obj.estimate:
+ seconds = calculate_execution_time(bconf)
+
+ h = seconds // 3600
+ m = (seconds % 3600) // 60
+ s = seconds % 60
+
+ print "{0}:{1}:{2}".format(h, m, s)
+ return 0
+
+ if argv_obj.start_at is not None:
+ ctime = time.time()
+ if argv_obj.start_at >= ctime:
+ time.sleep(ctime - argv_obj.start_at)
+
+ def raw_res_func(test_num, data):
+ pref = "========= RAW_RESULTS({0}) =========\n".format(test_num)
+ out_fd.write(pref)
+ out_fd.write(json.dumps(data))
+ out_fd.write("\n========= END OF RAW_RESULTS =========\n")
+ out_fd.flush()
+
+ rrfunc = raw_res_func if argv_obj.show_raw_results else None
+
+ stime = time.time()
+ job_res, num_tests = run_benchmark(argv_obj.type,
+ job_cfg,
+ params,
+ argv_obj.runcycle,
+ rrfunc,
+ argv_obj.skip_tests)
+ etime = time.time()
+
+ res = {'__meta__': {'raw_cfg': job_cfg}, 'res': job_res}
+
+ oformat = 'json' if argv_obj.json else 'eval'
+ out_fd.write("\nRun {} tests in {} seconds\n".format(num_tests,
+ int(etime - stime)))
+ out_fd.write("========= RESULTS(format={0}) =========\n".format(oformat))
+ if argv_obj.json:
+ out_fd.write(json.dumps(res))
+ else:
+ out_fd.write(pprint.pformat(res) + "\n")
+ out_fd.write("\n========= END OF RESULTS =========\n".format(oformat))
+
+ return 0
+
+
+if __name__ == '__main__':
+ exit(main(sys.argv[1:]))
diff --git a/tests/fio_configs/1.cfg b/tests/fio_configs/1.cfg
new file mode 100644
index 0000000..d5240cd
--- /dev/null
+++ b/tests/fio_configs/1.cfg
@@ -0,0 +1,100 @@
+[writetest_10 * 55]
+startdelay=10
+numjobs=1
+blocksize=4k
+filename=/media/koder/a5230078-4c27-4c3b-99aa-26148e78b2e7/xxx.bin
+rw=randwrite
+direct=1
+buffered=0
+iodepth=1
+size=1Gb
+runtime=10
+time_based
+wait_for_previous
+
+[writetest_20 * 55]
+startdelay=10
+numjobs=1
+blocksize=4k
+filename=/media/koder/a5230078-4c27-4c3b-99aa-26148e78b2e7/xxx.bin
+rw=randwrite
+direct=1
+buffered=0
+iodepth=1
+size=1Gb
+runtime=20
+time_based
+wait_for_previous
+
+[writetest_30 * 55]
+startdelay=10
+numjobs=1
+blocksize=4k
+filename=/media/koder/a5230078-4c27-4c3b-99aa-26148e78b2e7/xxx.bin
+rw=randwrite
+direct=1
+buffered=0
+iodepth=1
+size=1Gb
+runtime=30
+time_based
+wait_for_previous
+
+[writetest_120 * 55]
+startdelay=10
+numjobs=1
+blocksize=4k
+filename=/media/koder/a5230078-4c27-4c3b-99aa-26148e78b2e7/xxx.bin
+rw=randwrite
+direct=1
+buffered=0
+iodepth=1
+size=1Gb
+runtime=120
+time_based
+wait_for_previous
+
+[writetest_30_5 * 55]
+ramp_time=5
+startdelay=10
+numjobs=1
+blocksize=4k
+filename=/media/koder/a5230078-4c27-4c3b-99aa-26148e78b2e7/xxx.bin
+rw=randwrite
+direct=1
+buffered=0
+iodepth=1
+size=1Gb
+runtime=30
+time_based
+wait_for_previous
+
+[writetest_30_10 * 55]
+ramp_time=10
+startdelay=10
+numjobs=1
+blocksize=4k
+filename=/media/koder/a5230078-4c27-4c3b-99aa-26148e78b2e7/xxx.bin
+rw=randwrite
+direct=1
+buffered=0
+iodepth=1
+size=1Gb
+runtime=30
+time_based
+wait_for_previous
+
+[writetest_30_15 * 55]
+ramp_time=15
+startdelay=10
+numjobs=1
+blocksize=4k
+filename=/media/koder/a5230078-4c27-4c3b-99aa-26148e78b2e7/xxx.bin
+rw=randwrite
+direct=1
+buffered=0
+iodepth=1
+size=1Gb
+runtime=30
+time_based
+wait_for_previous
diff --git a/tests/fio_configs/2.cfg b/tests/fio_configs/2.cfg
new file mode 100644
index 0000000..050e477
--- /dev/null
+++ b/tests/fio_configs/2.cfg
@@ -0,0 +1,13 @@
+[writetest_10_20 * 3]
+ramp_time=5
+numjobs=1
+blocksize=4k
+filename={FILENAME}
+rw=randwrite
+direct=1
+buffered=0
+iodepth=1
+size=1Gb
+runtime=5
+time_based
+wait_for_previous
diff --git a/tests/io.py b/tests/io.py
deleted file mode 100644
index 2405f53..0000000
--- a/tests/io.py
+++ /dev/null
@@ -1,97 +0,0 @@
-import sys
-import time
-import json
-import select
-import pprint
-import argparse
-import subprocess
-from StringIO import StringIO
-from ConfigParser import RawConfigParser
-
-
-def run_fio(benchmark_config):
- cmd = ["fio", "--output-format=json", "-"]
- p = subprocess.Popen(cmd, stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
- raw_out, _ = p.communicate(benchmark_config)
- job_output = json.loads(raw_out)["jobs"][0]
-
- if job_output['write']['iops'] != 0:
- raw_result = job_output['write']
- else:
- raw_result = job_output['read']
-
- res = {}
-
- # 'bw_dev bw_mean bw_max bw_min'.split()
- for field in ["bw_mean", "iops"]:
- res[field] = raw_result[field]
-
- res["lat"] = raw_result["lat"]["mean"]
- res["clat"] = raw_result["clat"]["mean"]
- res["slat"] = raw_result["slat"]["mean"]
- res["util"] = json.loads(raw_out)["disk_util"][0]
-
- res["util"] = dict((str(k), v) for k, v in res["util"].items())
-
- return res
-
-
-def run_benchmark(binary_tp, *argv, **kwargs):
- if 'fio' == binary_tp:
- return run_fio(*argv, **kwargs)
- raise ValueError("Unknown behcnmark {0}".format(binary_tp))
-
-
-def parse_args(argv):
- parser = argparse.ArgumentParser(
- description="Run fio' and return result")
- parser.add_argument(
- "--type", metavar="BINARY_TYPE",
- choices=['fio'], required=True)
- parser.add_argument("--start-at", metavar="START_TIME", type=int)
- parser.add_argument("--json", action="store_true", default=False)
- parser.add_argument("jobfile")
- return parser.parse_args(argv)
-
-
-def main(argv):
- argv_obj = parse_args(argv)
- if argv_obj.jobfile == '-':
- job_cfg = ""
- dtime = 10
- while True:
- r, w, x = select.select([sys.stdin], [], [], dtime)
- if len(r) == 0:
- raise IOError("No config provided")
- char = sys.stdin.read(1)
- if '' == char:
- break
- job_cfg += char
- dtime = 1
- else:
- job_cfg = open(argv_obj.jobfile).read()
-
- rcp = RawConfigParser()
- rcp.readfp(StringIO(job_cfg))
- assert len(rcp.sections()) == 1
-
- if argv_obj.start_at is not None:
- ctime = time.time()
- if argv_obj.start_at >= ctime:
- time.sleep(ctime - argv_obj.start_at)
-
- res = run_benchmark(argv_obj.type, job_cfg)
- res['__meta__'] = dict(rcp.items(rcp.sections()[0]))
- res['__meta__']['raw'] = job_cfg
-
- if argv_obj.json:
- sys.stdout.write(json.dumps(res))
- else:
- sys.stdout.write(pprint.pformat(res))
- sys.stdout.write("\n")
- return 0
-
-if __name__ == '__main__':
- exit(main(sys.argv[1:]))
diff --git a/tests/io_scenario_check_assumptions.cfg b/tests/io_scenario_check_assumptions.cfg
new file mode 100644
index 0000000..25d99bc
--- /dev/null
+++ b/tests/io_scenario_check_assumptions.cfg
@@ -0,0 +1,59 @@
+[defaults]
+NUM_ROUNDS=7
+
+ramp_time=5
+buffered=0
+wait_for_previous
+filename=/media/koder/a5230078-4c27-4c3b-99aa-26148e78b2e7/xxx.bin
+iodepth=1
+size=10Gb
+time_based
+runtime=30
+
+# ---------------------------------------------------------------------
+# check test time, no warmup time. iops = func(rune_time)
+# ---------------------------------------------------------------------
+[runtime_test_wo_wu_{TEST_SUMM}_{UNIQ} * {NUM_ROUNDS}]
+startdelay=10
+blocksize={% 4k, 1m %}
+rw=randwrite
+direct=1
+ramp_time=0
+runtime={% 10, 15, 20, 30, 60, 120 %}
+
+# ---------------------------------------------------------------------
+# check test time, with warmup time. iops = func(rune_time)
+# ---------------------------------------------------------------------
+[runtime_test_w_wu_{TEST_SUMM}_{UNIQ} * {NUM_ROUNDS}]
+startdelay=10
+blocksize={% 4k, 1m %}
+rw=randwrite
+direct=1
+ramp_time={% 5, 10, 15 %}
+runtime={% 15, 30 %}
+
+# ---------------------------------------------------------------------
+# check read and write linearity. oper_time = func(size)
+# ---------------------------------------------------------------------
+[linearity_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize={% 512, 1k, 4k, 8k, 16k, 32k, 64k, 128k, 256k, 512k, 1m, 2m, 4m %}
+rw={% randwrite, randread %}
+direct=1
+
+# ---------------------------------------------------------------------
+# check sync write linearity. oper_time = func(size)
+# check sync BW as well
+# ---------------------------------------------------------------------
+[linearity_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize={% 512, 1k, 4k, 8k, 16k, 32k, 64k, 128k, 256k, 512k, 1m, 2m, 4m %}
+rw=randread
+sync=1
+
+# ---------------------------------------------------------------------
+# check different thread count. (latency, bw) = func(th_count)
+# ---------------------------------------------------------------------
+[concurrence_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize={% 4k, 1m %}
+rw=randwrite
+direct=1
+numjobs={% 1, 2, 5, 10, 15, 20, 25, 30, 35, 40 %}
diff --git a/io_task.cfg b/tests/io_task.cfg
similarity index 100%
rename from io_task.cfg
rename to tests/io_task.cfg
diff --git a/tests/io_task_test.cfg b/tests/io_task_test.cfg
new file mode 100644
index 0000000..a319fa0
--- /dev/null
+++ b/tests/io_task_test.cfg
@@ -0,0 +1,24 @@
+# [__warmup]
+# blocksize=4k
+# filename=/tmp/xxx.bin
+# rw=randwrite
+# direct=1
+# buffered=0
+# iodepth=1
+# size=1Gb
+# runtime=5
+# time_based
+
+[writetest * 3]
+numjobs=4
+wait_for_previous
+ramp_time=5
+blocksize=4k
+filename=/tmp/xxx.bin
+rw=randwrite
+direct=1
+buffered=0
+iodepth=1
+size=100Mb
+runtime=10
+time_based
diff --git a/itest.py b/tests/itest.py
similarity index 69%
rename from itest.py
rename to tests/itest.py
index daebd1a..e7fd3eb 100644
--- a/itest.py
+++ b/tests/itest.py
@@ -1,3 +1,4 @@
+import re
import abc
import json
import os.path
@@ -5,7 +6,7 @@
from StringIO import StringIO
from ConfigParser import RawConfigParser
-from tests import io
+from tests import disk_test_agent
from ssh_utils import copy_paths
from utils import run_over_ssh, ssize_to_b
@@ -90,26 +91,27 @@
class IOPerfTest(IPerfTest):
- io_py_remote = "/tmp/io.py"
+ io_py_remote = "/tmp/disk_test_agent.py"
def __init__(self,
test_options,
on_result_cb):
IPerfTest.__init__(self, on_result_cb)
self.options = test_options
- self.config_fname = test_options['config_file']
- self.tool = test_options['tool']
- self.configs = []
+ self.config_fname = test_options['cfg']
+ self.config_params = test_options.get('params', {})
+ self.tool = test_options.get('tool', 'fio')
+ self.raw_cfg = open(self.config_fname).read()
- cp = RawConfigParser()
- cp.readfp(open(self.config_fname))
-
- for secname in cp.sections():
- params = dict(cp.items(secname))
- self.configs.append((secname, params))
+ parse_func = disk_test_agent.parse_fio_config_full
+ self.configs = parse_func(self.raw_cfg, self.config_params)
def pre_run(self, conn):
- local_fname = io.__file__.rsplit('.')[0] + ".py"
+
+ # TODO: install fio, if not installed
+ run_over_ssh(conn, "apt-get -y install fio")
+
+ local_fname = disk_test_agent.__file__.rsplit('.')[0] + ".py"
self.files_to_copy = {local_fname: self.io_py_remote}
copy_paths(conn, self.files_to_copy)
@@ -129,46 +131,22 @@
def run(self, conn, barrier):
cmd_templ = "env python2 {0} --type {1} --json -"
cmd = cmd_templ.format(self.io_py_remote, self.tool)
+ logger.debug("Run {0}".format(cmd))
try:
- for secname, _params in self.configs:
- params = _params.copy()
- count = params.pop('count', 1)
-
- config = RawConfigParser()
- config.add_section(secname)
-
- for k, v in params.items():
- config.set(secname, k, v)
-
- cfg = StringIO()
- config.write(cfg)
-
- # FIX python config parser-fio incompatibility
- # remove spaces around '='
- new_cfg = []
- config_data = cfg.getvalue()
- for line in config_data.split("\n"):
- if '=' in line:
- name, val = line.split('=', 1)
- name = name.strip()
- val = val.strip()
- line = "{0}={1}".format(name, val)
- new_cfg.append(line)
-
- for _ in range(count):
- barrier.wait()
- code, out_err = run_over_ssh(conn, cmd,
- stdin_data="\n".join(new_cfg))
- self.on_result(code, out_err, cmd)
+ barrier.wait()
+ code, out_err = run_over_ssh(conn, cmd, stdin_data=self.raw_cfg)
+ self.on_result(code, out_err, cmd)
finally:
barrier.exit()
def on_result(self, code, out_err, cmd):
if 0 == code:
try:
- for line in out_err.split("\n"):
- if line.strip() != "":
- self.on_result_cb(json.loads(line))
+ start_patt = r"(?ims)=+\s+RESULTS\(format=json\)\s+=+"
+ end_patt = r"(?ims)=+\s+END OF RESULTS\s+=+"
+ for block in re.split(start_patt, out_err)[1:]:
+ data, garbage = re.split(end_patt, block)
+ self.on_result_cb(json.loads(data.strip()))
except Exception as exc:
msg_templ = "Error during postprocessing results: {0!r}"
raise RuntimeError(msg_templ.format(exc.message))