fixes, add complete test suite
diff --git a/config.yaml b/config.yaml
index 31b6410..3c27c90 100644
--- a/config.yaml
+++ b/config.yaml
@@ -1,36 +1,35 @@
# nodes to be started/detected
-cluster:
- openstack:
- connection:
- auth_url: http://172.16.54.130:5000/v2.0
- creds: admin:admin@admin
- discover:
- vm:
- name: test1
- auth: cirros:cubswin:)
- nodes:
- service: all
- auth: cirros:cubswin:)
- start:
- count: x1
- img_name: TestVM
- flavor_name: m1.micro
- keypair_name: test
- network_zone_name: net04
- flt_ip_pool: net04_ext
- key_file: path/to/
- user: username
+discover:
+ # openstack:
+ # connection:
+ # auth_url: http://172.16.54.130:5000/v2.0
+ # creds: admin:admin@admin
+ # discover:
+ # vm:
+ # name: test1
+ # auth: cirros:cubswin:)
+ # nodes:
+ # service: all
+ # auth: cirros:cubswin:)
- ceph: all
+ ceph: root@172.16.53.2::/home/koder/.ssh/id_rsa.pub
- fuel:
- connection:
- url: http://172.16.52.120:8000/
- creds: admin:admin@admin
- discover: controller
+ # fuel:
+ # connection:
+ # url: http://172.16.52.120:8000/
+ # creds: admin:admin@admin
+ # discover: controller
- other:
- - ssh://10.33.14.67, iscsi
+start:
+ count: x1
+ img_name: TestVM
+ flavor_name: m1.micro
+ keypair_name: test
+ network_zone_name: net04
+ flt_ip_pool: net04_ext
+ key_file: path/to/
+ user: username
+
# sensors to be installed, accordingli to role
sensors:
@@ -39,6 +38,7 @@
os-compute: io, net
test-vm: system-cpu
+
# tests to run
tests:
pgbench:
diff --git a/itest.py b/itest.py
index ad88fc0..2e782ee 100644
--- a/itest.py
+++ b/itest.py
@@ -5,7 +5,7 @@
import logging
from io_scenario import io
-from ssh_copy_directory import copy_paths
+from ssh_utils import copy_paths
from utils import run_over_ssh
diff --git a/nodes/ceph.py b/nodes/ceph.py
index 216f30d..38ce177 100644
--- a/nodes/ceph.py
+++ b/nodes/ceph.py
@@ -1,17 +1,23 @@
""" Collect data about ceph nodes"""
import json
+import logging
+
from node import Node
-from disk_perf_test_tool.ext_libs import sh
+from disk_perf_test_tool.ssh_utils import connect
-def discover_ceph_node():
+logger = logging.getLogger("io-perf-tool")
+
+
+def discover_ceph_node(ip):
""" Return list of ceph's nodes ips """
ips = {}
+ ssh = connect(ip)
- osd_ips = get_osds_ips(get_osds_list())
- mon_ips = get_mons_or_mds_ips("mon")
- mds_ips = get_mons_or_mds_ips("mds")
+ osd_ips = get_osds_ips(ssh, get_osds_list(ssh))
+ mon_ips = get_mons_or_mds_ips(ssh, "mon")
+ mds_ips = get_mons_or_mds_ips(ssh, "mds")
for ip in osd_ips:
url = "ssh://%s" % ip
@@ -28,23 +34,24 @@
return [Node(ip=url, roles=list(roles)) for url, roles in ips.items()]
-def get_osds_list():
+def get_osds_list(ssh):
""" Get list of osds id"""
- return filter(None, sh.ceph.osd.ls().split("\n"))
+ _, chan, _ = ssh.exec_command("ceph osd ls")
+ return filter(None, chan.read().split("\n"))
-def get_mons_or_mds_ips(who):
+def get_mons_or_mds_ips(ssh, who):
""" Return mon ip list
:param who - "mon" or "mds" """
if who == "mon":
- res = sh.ceph.mon.dump()
+ _, chan, _ = ssh.exec_command("ceph mon dump")
elif who == "mds":
- res = sh.ceph.mds.dump()
+ _, chan, _ = ssh.exec_command("ceph mds dump")
else:
raise ValueError(("'%s' in get_mons_or_mds_ips instead" +
"of mon/mds") % who)
- line_res = res.split("\n")
+ line_res = chan.read().split("\n")
ips = set()
for line in line_res:
@@ -60,12 +67,12 @@
return ips
-def get_osds_ips(osd_list):
+def get_osds_ips(ssh, osd_list):
""" Get osd's ips
:param osd_list - list of osd names from osd ls command"""
ips = set()
for osd_id in osd_list:
- res = sh.ceph.osd.find(osd_id)
- ip = json.loads(str(res))["ip"]
+ _, chan, _ = ssh.exec_command("ceph osd find {0}".format(osd_id))
+ ip = json.loads(chan.read())["ip"]
ips.add(ip.split(":")[0])
return ips
diff --git a/nodes/discover.py b/nodes/discover.py
index d36e517..ec98890 100644
--- a/nodes/discover.py
+++ b/nodes/discover.py
@@ -50,5 +50,5 @@
nodes_to_run.extend(fuel.discover_fuel_nodes(url, creads, roles))
if cluster == "ceph":
- nodes_to_run.extend(ceph.discover_ceph_node())
+ nodes_to_run.extend(ceph.discover_ceph_node(cluster_info["ip"]))
return nodes_to_run
diff --git a/nodes/openstack.py b/nodes/openstack.py
index 558985d..ab41f62 100644
--- a/nodes/openstack.py
+++ b/nodes/openstack.py
@@ -1,5 +1,7 @@
+import socket
import logging
+
from novaclient.client import Client
import node
@@ -41,11 +43,11 @@
services.extend(client.services.list(binary=s))
host_services_mapping = {}
+
for service in services:
- if host_services_mapping.get(service.host):
- host_services_mapping[service.host].append(service.binary)
- else:
- host_services_mapping[service.host] = [service.binary]
+ ip = socket.gethostbyname(service.host)
+ host_services_mapping[ip].append(service.binary)
+
logger.debug("Found %s openstack service nodes" %
len(host_services_mapping))
return [node.Node(host, services, username=user,
diff --git a/run_test.py b/run_test.py
index a132e16..25da0ab 100755
--- a/run_test.py
+++ b/run_test.py
@@ -54,6 +54,20 @@
return test_runner(obj)
+def conn_func(obj, barrier, latest_start_time, conn):
+ try:
+ test_iter = itest.run_test_iter(obj, conn)
+ next(test_iter)
+
+ wait_on_barrier(barrier, latest_start_time)
+
+ with log_error("!Run test"):
+ return next(test_iter)
+ except:
+ print traceback.format_exc()
+ raise
+
+
def parse_args(argv):
parser = argparse.ArgumentParser(
description="Run disk io performance test")
@@ -114,8 +128,8 @@
def connect_one(node):
try:
node.connection = ssh_utils.connect(node.connection_url)
- except Exception as exc:
- logger.exceprtion()
+ except Exception:
+ logger.exception()
def connect_all(nodes):
@@ -131,15 +145,15 @@
opts = parse_args(argv)
if 'discover' in opts.stages:
- current_data = discover.discover(cfg_dict.get('cluster'))
+ current_data = discover.discover(cfg_dict.get('discover'))
if 'connect' in opts.stages:
for node in current_data:
+ pass
print "\n".join(map(str, current_data))
return 0
-
# tests = cfg_dict.get("tests", [])
# Deploy and start sensors
@@ -150,7 +164,7 @@
# logger.debug("Run test with {0!r} params".format(cmd_line))
# latest_start_time = 300 + time.time()
# uris = [node.connection_url for node in nodes_to_run]
- # runner = ssh_runner.get_ssh_runner(uris,
+ # runner = ssh_runner.get_ssh_runner(uris, conn_func,
# latest_start_time,
# opts.get('keep_temp_files'))
# res = run_io_test(test_name,
diff --git a/scripts/single_node_test_complete.sh b/scripts/single_node_test_complete.sh
new file mode 100644
index 0000000..c5982cb
--- /dev/null
+++ b/scripts/single_node_test_complete.sh
@@ -0,0 +1,66 @@
+#!/bin/bash
+set -x
+
+TEST_FILE=$1
+OUT_FILE=$2
+NUM_CYCLES=7
+# TESTS_PER_CYCLE=9
+
+# COUNTER=0
+# (( NUM_TESTS=$NUM_CYCLES * $TESTS_PER_CYCLE))
+
+# function next() {
+# echo "Done $COUNTER tests from $NUM_TESTS"
+# (( COUNTER=$COUNTER + 1 ))
+# }
+
+function super_sync() {
+ sync
+ echo 3 > /proc/sys/vm/drop_caches
+}
+
+function run_tests(){
+ OPTS="--test-file $TEST_FILE --type fio --iodepth 1 --iosize 10G"
+ OPERS="read write randread randwrite"
+ CONCS="1 4 8 64"
+ SIZES="4k 16k 64k 256k 1m 2m"
+
+ # num cycles = 6 * 4 * 7 * 4 + 7 * 4 * 4 == 784 == 13 hours
+
+ super_sync ; dd if=/dev/zero of=$TEST_FILE bs=1048576 count=10240
+
+ for cycle in $(seq $NUM_CYCLES) ; do
+ for conc in $CONCS ; do
+ for bsize in $SIZES ; do
+ for operation in $OPERS ; do
+ super_sync ; python io.py $OPTS -a $operation --blocksize $bsize -d --concurrency $conc
+ done
+ done
+ done
+ done
+
+ for cycle in $(seq $NUM_CYCLES) ; do
+ for conc in $CONCS ; do
+ for operation in $OPERS ; do
+ super_sync ; python io.py $OPTS -a $operation --blocksize 4k -s --concurrency $conc
+ done
+ done
+ done
+
+ super_sync ; python io.py $OPTS -a write --blocksize 2m --concurrency 1
+ super_sync ; python io.py $OPTS -a write --blocksize 2m --concurrency 1
+ super_sync ; python io.py $OPTS -a write --blocksize 2m --concurrency 1
+
+ OPTS="--test-file $TEST_FILE --type fio --iodepth 1 --iosize 1G"
+ for cycle in $(seq $NUM_CYCLES) ; do
+ super_sync ; python io.py $OPTS -a randwrite --blocksize 4k -d --concurrency 1
+ done
+
+ OPTS="--test-file $TEST_FILE --type fio --iodepth 1 --iosize 10G"
+ # need to test different file sizes
+ # need to test different timeouts - maybe we can decrease test time
+}
+
+run_tests "$FILE_1" 2>&1 | tee "$OUT_FILE"
+
+
diff --git a/ssh_utils.py b/ssh_utils.py
index a7dda3f..7c859cf 100644
--- a/ssh_utils.py
+++ b/ssh_utils.py
@@ -1,20 +1,66 @@
import re
+import time
import Queue
import logging
import os.path
-import traceback
+import getpass
import threading
+import socket
+import paramiko
from concurrent.futures import ThreadPoolExecutor
-import itest
-from utils import ssh_connect
-from utils import get_barrier, log_error, wait_on_barrier
+from utils import get_barrier
logger = logging.getLogger("io-perf-tool")
conn_uri_attrs = ("user", "passwd", "host", "port", "path")
+def ssh_connect(creds, retry_count=60, timeout=1):
+ ssh = paramiko.SSHClient()
+ ssh.load_host_keys('/dev/null')
+ ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ ssh.known_hosts = None
+ for i in range(retry_count):
+ try:
+ if creds.user is None:
+ user = getpass.getuser()
+ else:
+ user = creds.user
+
+ if creds.passwd is not None:
+ ssh.connect(creds.host,
+ username=user,
+ password=creds.passwd,
+ port=creds.port,
+ allow_agent=False,
+ look_for_keys=False)
+ return ssh
+
+ if creds.key_file is not None:
+ ssh.connect(creds.host,
+ username=user,
+ key_filename=creds.key_file,
+ look_for_keys=False,
+ port=creds.port)
+ return ssh
+
+ key_file = os.path.expanduser('~/.ssh/id_rsa')
+ ssh.connect(creds.host,
+ username=user,
+ key_filename=key_file,
+ look_for_keys=False,
+ port=creds.port)
+ return ssh
+ # raise ValueError("Wrong credentials {0}".format(creds.__dict__))
+ except paramiko.PasswordRequiredException:
+ raise
+ except socket.error:
+ if i == retry_count - 1:
+ raise
+ time.sleep(timeout)
+
+
def normalize_dirpath(dirpath):
while dirpath.endswith("/"):
dirpath = dirpath[:-1]
@@ -178,21 +224,8 @@
return ssh_connect(creds)
-def conn_func(obj, barrier, latest_start_time, conn):
- try:
- test_iter = itest.run_test_iter(obj, conn)
- next(test_iter)
-
- wait_on_barrier(barrier, latest_start_time)
-
- with log_error("!Run test"):
- return next(test_iter)
- except:
- print traceback.format_exc()
- raise
-
-
def get_ssh_runner(uris,
+ conn_func,
latest_start_time=None,
keep_temp_files=False):
logger.debug("Connecting to servers")
diff --git a/utils.py b/utils.py
index 8b6e1b0..059101a 100644
--- a/utils.py
+++ b/utils.py
@@ -1,14 +1,11 @@
import time
import socket
import os.path
-import getpass
import logging
import threading
import contextlib
import multiprocessing
-import paramiko
-
logger = logging.getLogger("io-perf-tool")
@@ -46,51 +43,6 @@
return closure
-def ssh_connect(creds, retry_count=60, timeout=1):
- ssh = paramiko.SSHClient()
- ssh.load_host_keys('/dev/null')
- ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- ssh.known_hosts = None
- for i in range(retry_count):
- try:
- if creds.user is None:
- user = getpass.getuser()
- else:
- user = creds.user
-
- if creds.passwd is not None:
- ssh.connect(creds.host,
- username=user,
- password=creds.passwd,
- port=creds.port,
- allow_agent=False,
- look_for_keys=False)
- return ssh
-
- if creds.key_file is not None:
- ssh.connect(creds.host,
- username=user,
- key_filename=creds.key_file,
- look_for_keys=False,
- port=creds.port)
- return ssh
-
- key_file = os.path.expanduser('~/.ssh/id_rsa')
- ssh.connect(creds.host,
- username=user,
- key_filename=key_file,
- look_for_keys=False,
- port=creds.port)
- return ssh
- # raise ValueError("Wrong credentials {0}".format(creds.__dict__))
- except paramiko.PasswordRequiredException:
- raise
- except socket.error:
- if i == retry_count - 1:
- raise
- time.sleep(timeout)
-
-
def wait_on_barrier(barrier, latest_start_time):
if barrier is not None:
if latest_start_time is not None: