add local runner, fix some bugs
diff --git a/io_scenario/io.py b/io_scenario/io.py
index 7d5516d..7256f02 100644
--- a/io_scenario/io.py
+++ b/io_scenario/io.py
@@ -297,9 +297,9 @@
if binary_path is None:
sys.stderr.write("Can't found neither iozone not iozone3 binary"
"Provide --bonary-path or --binary-url option")
- return False, None
+ return None
- return False, binary_path
+ return binary_path
# ------------------------------ FIO SUPPORT ---------------------------------
@@ -354,26 +354,14 @@
def locate_fio():
- return False, which('fio')
+ return which('fio')
# ----------------------------------------------------------------------------
-def locate_binary(binary_tp, binary_url, binary_path):
- remove_binary = False
-
- if binary_url is not None:
- if binary_path is not None:
- sys.stderr.write("At most one option from --binary-path and "
- "--binary-url should be provided")
- return False, None
-
- binary_path = os.tmpnam()
- install_iozone_static(binary_url, binary_path)
- remove_binary = True
-
- elif binary_path is not None:
+def locate_binary(binary_tp, binary_path):
+ if binary_path is not None:
if os.path.isfile(binary_path):
if not os.access(binary_path, os.X_OK):
st = os.stat(binary_path)
@@ -382,7 +370,7 @@
binary_path = None
if binary_path is not None:
- return remove_binary, binary_path
+ return binary_path
if 'iozone' == binary_tp:
return locate_iozone()
@@ -459,7 +447,7 @@
choices=['iozone', 'fio'], required=True)
parser.add_argument(
"--iodepth", metavar="IODEPTH", type=int,
- help="I/O depths to test in kb", required=True)
+ help="I/O requests queue depths", required=True)
parser.add_argument(
'-a', "--action", metavar="ACTION", type=str,
help="actions to run", required=True,
@@ -480,16 +468,13 @@
"-d", "--direct-io", default=False, action="store_true",
help="use O_DIRECT", dest='directio')
parser.add_argument(
- "-t", "--sync-time", default=None, type=int,
+ "-t", "--sync-time", default=None, type=int, metavar="UTC_TIME",
help="sleep till sime utc time", dest='sync_time')
parser.add_argument(
- "--binary-url", help="static binary url",
- dest="binary_url", default=None)
- parser.add_argument(
"--test-file", help="file path to run test on",
default=None, dest='test_file')
parser.add_argument(
- "--binary-path", help="binary path",
+ "--binary-path", help="path to binary, used for testing",
default=None, dest='binary_path')
parser.add_argument(
"--prepare-only", default=False, dest='prepare_only',
@@ -545,9 +530,8 @@
warnings.simplefilter("ignore")
test_file_name = os.tmpnam()
- remove_binary, binary_path = locate_binary(argv_obj.type,
- argv_obj.binary_url,
- argv_obj.binary_path)
+ binary_path = locate_binary(argv_obj.type,
+ argv_obj.binary_path)
if binary_path is None:
sys.stderr.write("Can't locate binary {0}\n".format(argv_obj.type))
@@ -592,9 +576,6 @@
sys.stdout.write("\n")
finally:
- if remove_binary:
- os.unlink(binary_path)
-
if os.path.isfile(test_file_name) and not argv_obj.prepare_only:
os.unlink(test_file_name)
diff --git a/run_test.py b/run_test.py
index 2f09ef8..b1d5a59 100644
--- a/run_test.py
+++ b/run_test.py
@@ -2,30 +2,34 @@
import sys
import json
import time
+import shutil
import pprint
+import weakref
import logging
import os.path
import argparse
import traceback
+import subprocess
+import contextlib
-import io_scenario
-from itest import IOPerfTest
-from rest_api import add_test
import ssh_runner
+import io_scenario
+from utils import log_error
+from rest_api import add_test
+from itest import IOPerfTest, run_test_iter
+from starts_vms import nova_connect, create_vms_mt, clear_all
try:
import rally_runner
except ImportError:
rally_runner = None
-from starts_vms import nova_connect, create_vms_mt, clear_all
-
logger = logging.getLogger("io-perf-tool")
-logger.setLevel(logging.DEBUG)
+logger.setLevel(logging.INFO)
ch = logging.StreamHandler()
-ch.setLevel(logging.DEBUG)
+ch.setLevel(logging.INFO)
logger.addHandler(ch)
log_format = '%(asctime)s - %(levelname)s - %(name)s - %(message)s'
@@ -52,6 +56,74 @@
return test_runner(obj)
+class FileWrapper(object):
+ def __init__(self, fd, conn):
+ self.fd = fd
+ self.channel_wr = weakref.ref(conn)
+
+ def read(self):
+ return self.fd.read()
+
+ @property
+ def channel(self):
+ return self.channel_wr()
+
+
+class LocalConnection(object):
+ def __init__(self):
+ self.proc = None
+
+ def exec_command(self, cmd):
+ PIPE = subprocess.PIPE
+ self.proc = subprocess.Popen(cmd,
+ shell=True,
+ stdout=PIPE,
+ stderr=PIPE,
+ stdin=PIPE)
+ res = (self.proc.stdin,
+ FileWrapper(self.proc.stdout, self),
+ self.proc.stderr)
+
+ return res
+
+ def recv_exit_status(self):
+ return self.proc.wait()
+
+ def open_sftp(self):
+ return self
+
+ def close(self):
+ pass
+
+ def put(self, localfile, remfile):
+ return shutil.copy(localfile, remfile)
+
+ def mkdir(self, remotepath, mode):
+ os.mkdir(remotepath)
+ os.chmod(remotepath, mode)
+
+ def chmod(self, remotepath, mode):
+ os.chmod(remotepath, mode)
+
+ def copytree(self, src, dst):
+ shutil.copytree(src, dst)
+
+
+def get_local_runner(clear_tmp_files=True):
+ def closure(obj):
+ res = []
+ obj.set_result_cb(res.append)
+ test_iter = run_test_iter(obj,
+ LocalConnection())
+ next(test_iter)
+
+ with log_error("!Run test"):
+ next(test_iter)
+ return res
+
+ return closure
+
+
def parse_args(argv):
parser = argparse.ArgumentParser(
description="Run disk io performance test")
@@ -85,18 +157,23 @@
parser.add_argument("-n", "--lab-name", default=None,
dest="lab_name")
+ parser.add_argument("--create-vms-opts", default=None,
+ help="Creating vm's before run ssh runner",
+ dest="create_vms_opts")
+
parser.add_argument("-k", "--keep", default=False,
help="keep temporary files",
dest="keep_temp_files", action='store_true')
- choices = ["ssh"]
+ choices = ["local", "ssh"]
+
if rally_runner is not None:
choices.append("rally")
parser.add_argument("--runner", required=True,
choices=choices, help="runner type")
- parser.add_argument("--runner-extra-opts", default="",
+ parser.add_argument("--runner-extra-opts", default=None,
dest="runner_opts", help="runner extra options")
return parser.parse_args(argv)
@@ -179,17 +256,58 @@
return templ.format(data, format_measurements_stat(res), "=" * 80)
+@contextlib.contextmanager
+def start_test_vms(opts):
+ create_vms_opts = {}
+ for opt in opts.split(","):
+ name, val = opt.split("=", 1)
+ create_vms_opts[name] = val
+
+ user = create_vms_opts.pop("user")
+ key_file = create_vms_opts.pop("key_file")
+ aff_group = create_vms_opts.pop("aff_group", None)
+ raw_count = create_vms_opts.pop("count", "x1")
+
+ logger.debug("Connection to nova")
+ nova = nova_connect()
+
+ if raw_count.startswith("x"):
+ logger.debug("Getting amount of compute services")
+ count = len(nova.services.list(binary="nova-compute"))
+ count *= int(raw_count[1:])
+ else:
+ count = int(raw_count)
+
+ if aff_group is not None:
+ scheduler_hints = {'group': aff_group}
+ else:
+ scheduler_hints = None
+
+ create_vms_opts['scheduler_hints'] = scheduler_hints
+
+ logger.debug("Will start {0} vms".format(count))
+
+ try:
+ ips = [i[0] for i in create_vms_mt(nova, count, **create_vms_opts)]
+
+ uris = ["{0}@{1}::{2}".format(user, ip, key_file) for ip in ips]
+
+ yield uris
+ except:
+ traceback.print_exc()
+ finally:
+ logger.debug("Clearing")
+ clear_all(nova)
+
+
def main(argv):
opts = parse_args(argv)
- if not opts.extra_logs:
- def nolog(x):
- pass
+ if opts.extra_logs:
+ logger.setLevel(logging.DEBUG)
+ ch.setLevel(logging.DEBUG)
io_opts = get_io_opts(opts.io_opts_file, opts.io_opts)
- data_server_url = opts.data_server_url
- # lab_name = opts.lab_name
- build_name = opts.build_name
if opts.runner == "rally":
logger.debug("Use rally runner")
@@ -210,47 +328,42 @@
opts.keep_temp_files)
logger.debug(format_result(res))
+ elif opts.runner == "local":
+ logger.debug("Run on local computer")
+ try:
+ for script_args in io_opts:
+ cmd_line = " ".join(script_args)
+ logger.debug("Run test with {0!r} params".format(cmd_line))
+ runner = get_local_runner(opts.keep_temp_files)
+ res = run_io_test(opts.tool_type,
+ script_args,
+ runner,
+ opts.keep_temp_files)
+ logger.debug(format_result(res))
+ except:
+ traceback.print_exc()
+ return 1
+
elif opts.runner == "ssh":
logger.debug("Use ssh runner")
- create_vms_opts = {}
- for opt in opts.runner_opts.split(","):
- name, val = opt.split("=", 1)
- create_vms_opts[name] = val
- user = create_vms_opts.pop("user")
- key_file = create_vms_opts.pop("key_file")
- aff_group = create_vms_opts.pop("aff_group", None)
- raw_count = create_vms_opts.pop("count", "x1")
+ uris = []
- logger.debug("Connection to nova")
- nova = nova_connect()
-
- if raw_count.startswith("x"):
- logger.debug("Getting amount of compute services")
- count = len(nova.services.list(binary="nova-compute"))
- count *= int(raw_count[1:])
+ if opts.create_vms_opts is not None:
+ vm_context = start_test_vms(opts.create_vms_opts)
+ uris += vm_context.__enter__()
else:
- count = int(raw_count)
+ vm_context = None
- if aff_group is not None:
- scheduler_hints = {'group': aff_group}
- else:
- scheduler_hints = None
+ if opts.runner_opts is not None:
+ uris += opts.runner_opts.split(";")
- create_vms_opts['scheduler_hints'] = scheduler_hints
-
- # nova, amount, keypair_name, img_name,
- # flavor_name, vol_sz=None, network_zone_name=None,
- # flt_ip_pool=None, name_templ='ceph-test-{0}',
- # scheduler_hints=None
-
- logger.debug("Will start {0} vms".format(count))
+ if len(uris) == 0:
+ logger.critical("You need to provide at least" +
+ " vm spawn params or ssh params")
+ return 1
try:
- ips = [i[0] for i in create_vms_mt(nova, count, **create_vms_opts)]
-
- uris = ["{0}@{1}::{2}".format(user, ip, key_file) for ip in ips]
-
for script_args in io_opts:
cmd_line = " ".join(script_args)
logger.debug("Run test with {0!r} params".format(cmd_line))
@@ -266,13 +379,17 @@
except:
traceback.print_exc()
+ return 1
finally:
- logger.debug("Clearing")
- clear_all(nova)
+ if vm_context is not None:
+ vm_context.__exit__()
+ logger.debug("Clearing")
- result = json.loads(format_measurements_stat(res))
- result['name'] = build_name
- add_test(build_name, result, data_server_url)
+ if opts.data_server_url:
+ result = json.loads(format_measurements_stat(res))
+ result['name'] = opts.build_name
+ add_test(opts.build_name, result, opts.data_server_url)
+
return 0
diff --git a/ssh_copy_directory.py b/ssh_copy_directory.py
index d074fcf..265133c 100644
--- a/ssh_copy_directory.py
+++ b/ssh_copy_directory.py
@@ -29,6 +29,11 @@
def put_dir_recursively(sftp, localpath, remotepath, preserve_perm=True):
"upload local directory to remote recursively"
+ # hack for localhost connection
+ if hasattr(sftp, "copytree"):
+ sftp.copytree(localpath, remotepath)
+ return
+
assert remotepath.startswith("/"), "%s must be absolute path" % remotepath
# normalize
diff --git a/ssh_runner.py b/ssh_runner.py
index d5d7aca..ad3f82e 100644
--- a/ssh_runner.py
+++ b/ssh_runner.py
@@ -30,7 +30,8 @@
user_rr = "[^:]*?"
host_rr = "[^:]*?"
port_rr = "\\d+"
- key_file_rr = ".*"
+ key_file_rr = "[^:@]*"
+ passwd_rr = ".*?"
re_dct = ReParts.__dict__
@@ -42,7 +43,11 @@
re_dct = ReParts.__dict__
templs = [
- "^{user_rr}@{host_rr}::{key_file_rr}$"
+ "^{host_rr}$",
+ "^{user_rr}@{host_rr}::{key_file_rr}$",
+ "^{user_rr}@{host_rr}:{port_rr}:{key_file_rr}$",
+ "^{user_rr}:{passwd_rr}@@{host_rr}$",
+ "^{user_rr}:{passwd_rr}@@{host_rr}:{port_rr}$",
]
for templ in templs:
@@ -62,6 +67,7 @@
# ip_host::path_to_key_file
res = ConnCreds()
+ res.port = "22"
for rr in uri_reg_exprs:
rrm = re.match(rr, uri)
@@ -73,7 +79,8 @@
def connect(uri):
creds = parse_ssh_uri(uri)
- return ssh_connect(creds.host, creds.user, creds.key_file)
+ creds.port = int(creds.port)
+ return ssh_connect(creds)
def conn_func(obj, barrier, latest_start_time, conn):
diff --git a/utils.py b/utils.py
index 78a8df6..f73090a 100644
--- a/utils.py
+++ b/utils.py
@@ -1,4 +1,5 @@
import time
+import socket
import logging
import threading
import contextlib
@@ -31,18 +32,33 @@
return closure
-def ssh_connect(host, user, key_file, retry_count=60, timeout=1):
+def ssh_connect(creds, retry_count=60, timeout=1):
ssh = paramiko.SSHClient()
ssh.load_host_keys('/dev/null')
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.known_hosts = None
-
for i in range(retry_count):
try:
- ssh.connect(host, username=user, key_filename=key_file,
- look_for_keys=False)
- return ssh
- except:
+ if creds.passwd is not None:
+ ssh.connect(creds.host,
+ username=creds.user,
+ password=creds.passwd,
+ port=creds.port,
+ allow_agent=False,
+ look_for_keys=False)
+ return ssh
+
+ if creds.key_file is not None:
+ ssh.connect(creds.host,
+ username=creds.user,
+ key_filename=creds.key_file,
+ look_for_keys=False,
+ port=creds.port)
+ return ssh
+ raise ValueError("Wrong credentials {0}".format(creds.__dict__))
+ except paramiko.PasswordRequiredException:
+ raise
+ except socket.error:
if i == retry_count - 1:
raise
time.sleep(timeout)
@@ -62,6 +78,8 @@
if not barrier(timeout):
logger.debug("Barrier timeouted")
+ else:
+ logger.debug("Passing barrier, starting test")
@contextlib.contextmanager