tests finally can works over pure novaclient/ssh
diff --git a/itest.py b/itest.py
index 4f631d7..d4f4f52 100644
--- a/itest.py
+++ b/itest.py
@@ -5,10 +5,14 @@
from io_scenario import io
from ssh_copy_directory import copy_paths
+from utils import run_over_ssh
class IPerfTest(object):
def __init__(self, on_result_cb):
+ self.set_result_cb(on_result_cb)
+
+ def set_result_cb(self, on_result_cb):
self.on_result_cb = on_result_cb
def build(self, conn):
@@ -54,9 +58,8 @@
def run(self, conn):
args = ['env', 'python2', self.io_py_remote] + self.script_opts
- code, out, err = conn.execute(" ".join(args))
+ code, out, err = run_over_ssh(conn, " ".join(args))
self.on_result(code, out, err)
- return code, out, err
def on_result(self, code, out, err):
if 0 == code:
diff --git a/log.py b/log.py
new file mode 100644
index 0000000..fdef091
--- /dev/null
+++ b/log.py
@@ -0,0 +1,21 @@
+import os
+import sys
+import datetime
+
+
+def _log(x):
+ dt_str = datetime.datetime.now().strftime("%H:%M:%S")
+ pref = dt_str + " " + str(os.getpid()) + " >>>> "
+ sys.stderr.write(pref + x.replace("\n", "\n" + pref) + "\n")
+
+
+logger = _log
+
+
+def log(x):
+ logger(x)
+
+
+def setlogger(nlogger):
+ global logger
+ logger = nlogger
diff --git a/rally_runner.py b/rally_runner.py
index e9c4caa..093ae12 100644
--- a/rally_runner.py
+++ b/rally_runner.py
@@ -5,6 +5,7 @@
import warnings
import functools
import contextlib
+import multiprocessing
from rally import exceptions
from rally.cmd import cliutils
@@ -13,11 +14,8 @@
from rally.benchmark.scenarios.vm.vmtasks import VMTasks
import itest
-from utils import get_barrier
-
-
-def log(x):
- pass
+from log import log
+from utils import get_barrier, wait_on_barrier
@contextlib.contextmanager
@@ -75,25 +73,13 @@
raise exceptions.ScriptError("monkeypatch code fails on "
"ssh._client.open_sftp()")
- test_iter = itest.run_test_iter(test_obj, ssh)
+ test_iter = itest.run_test_iter(test_obj, ssh._get_client())
next(test_iter)
log("Start io test")
- if barrier is not None:
- if latest_start_time is not None:
- timeout = latest_start_time - time.time()
- else:
- timeout = None
-
- if timeout is not None and timeout > 0:
- msg = "Ready and waiting on barrier. " + \
- "Will wait at most {0} seconds"
- log(msg.format(int(timeout)))
-
- if not barrier(timeout):
- log("Barrier timeouted")
+ wait_on_barrier(barrier, latest_start_time)
try:
code, out, err = next(test_iter)
@@ -120,10 +106,6 @@
VMScenario.run_action = orig
-def run_rally(rally_args):
- return cliutils.run(['rally', "--rally-debug"] + rally_args, categories)
-
-
def prepare_files(files_dir):
# we do need temporary named files
@@ -164,7 +146,7 @@
with do_patch1(obj, barrier, max_release_time):
opts = ['task', 'start', yaml_file] + list(rally_extra_opts)
log("Start rally with opts '{0}'".format(" ".join(opts)))
- run_rally(opts)
+ cliutils.run(['rally', "--rally-debug"] + opts, categories)
finally:
if not keep_temp_files:
os.unlink(yaml_file)
@@ -176,9 +158,18 @@
keep_temp_files):
def closure(obj):
+ result_queue = multiprocessing.Queue()
+ obj.set_result_cb(result_queue.put)
+
run_tests_using_rally(obj,
files_dir,
max_preparation_time,
rally_extra_opts,
keep_temp_files)
+
+ test_result = []
+ while not result_queue.empty():
+ test_result.append(result_queue.get())
+
+ return test_result
return closure
diff --git a/run_rally_test.py b/run_rally_test.py
deleted file mode 100644
index 5a4df58..0000000
--- a/run_rally_test.py
+++ /dev/null
@@ -1,137 +0,0 @@
-import os
-import sys
-import json
-import pprint
-import os.path
-import argparse
-import datetime
-import multiprocessing
-
-import io_scenario
-import rally_runner
-from itest import IOPerfTest
-
-
-def log(x):
- dt_str = datetime.datetime.now().strftime("%H:%M:%S")
- pref = dt_str + " " + str(os.getpid()) + " >>>> "
- sys.stderr.write(pref + x.replace("\n", "\n" + pref) + "\n")
-
-
-def run_io_test(tool,
- script_args,
- test_runner,
- keep_temp_files=False):
-
- files_dir = os.path.dirname(io_scenario.__file__)
-
- path = 'iozone' if 'iozone' == tool else 'fio'
- src_testtool_path = os.path.join(files_dir, path)
-
- result_queue = multiprocessing.Queue()
-
- obj = IOPerfTest(script_args,
- src_testtool_path,
- result_queue.put,
- keep_temp_files)
-
- test_runner(obj)
-
- test_result = []
- while not result_queue.empty():
- test_result.append(result_queue.get())
-
- return test_result
-
-
-def parse_args(argv):
- parser = argparse.ArgumentParser(
- description="Run rally disk io performance test")
- parser.add_argument("tool_type", help="test tool type",
- choices=['iozone', 'fio'])
- parser.add_argument("-l", dest='extra_logs',
- action='store_true', default=False,
- help="print some extra log info")
- parser.add_argument("-o", "--io-opts", dest='io_opts',
- required=True,
- help="cmd line options for io.py")
- parser.add_argument("-t", "--test-directory", help="directory with test",
- dest="test_directory", required=True)
- parser.add_argument("--max-preparation-time", default=300,
- type=int, dest="max_preparation_time")
- parser.add_argument("-k", "--keep", default=False,
- help="keep temporary files",
- dest="keep_temp_files", action='store_true')
- parser.add_argument("--rally-extra-opts", dest="rally_extra_opts",
- default="", help="rally extra options")
-
- return parser.parse_args(argv)
-
-
-def main(argv):
- opts = parse_args(argv)
-
- if not opts.extra_logs:
- global log
-
- def nolog(x):
- pass
-
- log = nolog
- else:
- rally_runner.log = log
-
- script_args = [opt.strip()
- for opt in opts.io_opts.split(" ")
- if opt.strip() != ""]
-
- runner = rally_runner.get_rally_runner(
- files_dir=os.path.dirname(io_scenario.__file__),
- rally_extra_opts=opts.rally_extra_opts.split(" "),
- max_preparation_time=opts.max_preparation_time,
- keep_temp_files=opts.keep_temp_files)
-
- res = run_io_test(opts.tool_type,
- script_args,
- runner,
- opts.keep_temp_files)
-
- print "=" * 80
- print pprint.pformat(res)
- print "=" * 80
-
- if len(res) != 0:
- bw_mean = 0.0
- for measurement in res:
- bw_mean += measurement["bw_mean"]
-
- bw_mean /= len(res)
-
- it = ((bw_mean - measurement["bw_mean"]) ** 2 for measurement in res)
- bw_dev = sum(it) ** 0.5
-
- meta = res[0]['__meta__']
- key = "{0} {1} {2}k".format(meta['action'],
- 's' if meta['sync'] else 'a',
- meta['blocksize'])
-
- print
- print "====> " + json.dumps({key: (int(bw_mean), int(bw_dev))})
- print
- print "=" * 80
-
- return 0
-
-
-ostack_prepare = """
-glance image-create --name 'ubuntu' --disk-format qcow2
---container-format bare --is-public true --copy-from
-https://cloud-images.ubuntu.com/trusty/current/trusty-server-cloudimg-amd64-disk1.img
-
-nova flavor-create ceph.512 ceph.512 512 50 1
-nova server-group-create --policy anti-affinity ceph
-"""
-
-
-if __name__ == '__main__':
- exit(main(sys.argv[1:]))
diff --git a/run_test.py b/run_test.py
new file mode 100644
index 0000000..04176b6
--- /dev/null
+++ b/run_test.py
@@ -0,0 +1,165 @@
+import os
+import sys
+import json
+import time
+import pprint
+import os.path
+import argparse
+
+import io_scenario
+from itest import IOPerfTest
+from log import setlogger
+
+import ssh_runner
+import rally_runner
+
+from starts_vms import nova_connect, create_vms_mt, clear_all
+
+
+def run_io_test(tool,
+ script_args,
+ test_runner,
+ keep_temp_files=False):
+
+ files_dir = os.path.dirname(io_scenario.__file__)
+
+ path = 'iozone' if 'iozone' == tool else 'fio'
+ src_testtool_path = os.path.join(files_dir, path)
+
+ obj = IOPerfTest(script_args,
+ src_testtool_path,
+ None,
+ keep_temp_files)
+
+ return test_runner(obj)
+
+
+def parse_args(argv):
+ parser = argparse.ArgumentParser(
+ description="Run disk io performance test")
+
+ parser.add_argument("tool_type", help="test tool type",
+ choices=['iozone', 'fio'])
+
+ parser.add_argument("-l", dest='extra_logs',
+ action='store_true', default=False,
+ help="print some extra log info")
+
+ parser.add_argument("-o", "--io-opts", dest='io_opts',
+ required=True,
+ help="cmd line options for io.py")
+
+ parser.add_argument("-t", "--test-directory", help="directory with test",
+ dest="test_directory", required=True)
+
+ parser.add_argument("--max-preparation-time", default=300,
+ type=int, dest="max_preparation_time")
+
+ parser.add_argument("-k", "--keep", default=False,
+ help="keep temporary files",
+ dest="keep_temp_files", action='store_true')
+
+ parser.add_argument("--runner", required=True,
+ choices=["ssh", "rally"],
+ help="runner type")
+
+ parser.add_argument("--runner-extra-opts", default="",
+ dest="runner_opts", help="runner extra options")
+
+ return parser.parse_args(argv)
+
+
+def main(argv):
+ opts = parse_args(argv)
+
+ if not opts.extra_logs:
+ def nolog(x):
+ pass
+
+ setlogger(nolog)
+
+ script_args = [opt.strip()
+ for opt in opts.io_opts.split(" ")
+ if opt.strip() != ""]
+
+ if opts.runner == "rally":
+ runner = rally_runner.get_rally_runner(
+ files_dir=os.path.dirname(io_scenario.__file__),
+ rally_extra_opts=opts.runner_opts.split(" "),
+ max_preparation_time=opts.max_preparation_time,
+ keep_temp_files=opts.keep_temp_files)
+ res = run_io_test(opts.tool_type,
+ script_args,
+ runner,
+ opts.keep_temp_files)
+ elif opts.runner == "ssh":
+ user, key_file = opts.runner_opts.split(" ", 1)
+
+ latest_start_time = opts.max_preparation_time + time.time()
+
+ nova = nova_connect()
+
+ # nova, amount, keypair_name, img_name,
+ # flavor_name, vol_sz=None, network_zone_name=None,
+ # flt_ip_pool=None, name_templ='ceph-test-{}',
+ # scheduler_hints=None
+
+ try:
+ ips = [i[0] for i in create_vms_mt(nova, 3,
+ keypair_name='ceph',
+ img_name='ubuntu',
+ flavor_name='ceph.512',
+ network_zone_name='net04',
+ flt_ip_pool='net04_ext')]
+
+ uris = ["{0}@{1}::{2}".format(user, ip, key_file) for ip in ips]
+
+ runner = ssh_runner.get_ssh_runner(uris,
+ latest_start_time,
+ opts.keep_temp_files)
+ res = run_io_test(opts.tool_type,
+ script_args,
+ runner,
+ opts.keep_temp_files)
+ finally:
+ clear_all(nova)
+
+ print "=" * 80
+ print pprint.pformat(res)
+ print "=" * 80
+
+ if len(res) != 0:
+ bw_mean = 0.0
+ for measurement in res:
+ bw_mean += measurement["bw_mean"]
+
+ bw_mean /= len(res)
+
+ it = ((bw_mean - measurement["bw_mean"]) ** 2 for measurement in res)
+ bw_dev = sum(it) ** 0.5
+
+ meta = res[0]['__meta__']
+ key = "{0} {1} {2}k".format(meta['action'],
+ 's' if meta['sync'] else 'a',
+ meta['blocksize'])
+
+ print
+ print "====> " + json.dumps({key: (int(bw_mean), int(bw_dev))})
+ print
+ print "=" * 80
+
+ return 0
+
+
+ostack_prepare = """
+glance image-create --name 'ubuntu' --disk-format qcow2
+--container-format bare --is-public true --copy-from
+https://cloud-images.ubuntu.com/trusty/current/trusty-server-cloudimg-amd64-disk1.img
+
+nova flavor-create ceph.512 ceph.512 512 50 1
+nova server-group-create --policy anti-affinity ceph
+"""
+
+
+if __name__ == '__main__':
+ exit(main(sys.argv[1:]))
diff --git a/scripts/run.sh b/scripts/run.sh
index 342356e..b5bc41c 100644
--- a/scripts/run.sh
+++ b/scripts/run.sh
@@ -7,12 +7,12 @@
bsizes="1k 4k 64k 256k 1m"
ops="write randwrite"
osync="s a"
-three_times="1 2 3"
+num_times=3
for bsize in $bsizes ; do
for op in $ops ; do
for sync in $osync ; do
- for xxx in $three_times ; do
+ for counter in $(seq 1 $num_times) ; do
if [[ "$ops" == "write" && "$osync" == "s" ]] ; then
continue
fi
@@ -30,7 +30,7 @@
fi
io_opts="--type $type -a $op --iodepth 16 --blocksize $bsize --iosize $factor $ssync"
- python run_rally_test.py -l -o "$io_opts" -t io-scenario $type --rally-extra-opts="--deployment $1"
+ python run_test.py --runner rally -l -o "$io_opts" -t io-scenario $type --runner-extra-opts="--deployment $1"
done
done
done
diff --git a/scripts/run_2.sh b/scripts/run_2.sh
index 7bd92fb..2265176 100644
--- a/scripts/run_2.sh
+++ b/scripts/run_2.sh
@@ -1,8 +1,33 @@
#!/bin/bash
set -x
-set -e
type="iozone"
+# nova image-list | grep ' ubuntu ' >/dev/null
+# if [ $? -ne 0 ] ; then
+# url="https://cloud-images.ubuntu.com/trusty/current/trusty-server-cloudimg-amd64-disk1.img"
+# glance image-create --name 'ubuntu' --disk-format qcow2 --container-format bare --is-public true --copy-from $url
+# fi
+
+# nova flavor-list | grep ' ceph.512 ' >/dev/null
+# if [ $? -ne 0 ] ; then
+# nova flavor-create ceph.512 ceph.512 512 50 1
+# fi
+
+# nova server-group-list | grep ' ceph ' >/dev/null
+# if [ $? -ne 0 ] ; then
+# nova server-group-create --policy anti-affinity ceph
+# fi
+
+# nova keypair-list | grep ' ceph ' >/dev/null
+# if [ $? -ne 0 ] ; then
+# nova keypair-add ceph > ceph.pem
+# fi
+
+set -e
+
io_opts="--type $type -a write --iodepth 16 --blocksize 1m --iosize x20"
-python run_rally_test.py -l -o "$io_opts" -t io-scenario $type --rally-extra-opts="--deployment $1"
+python run_test.py --runner ssh -l -o "$io_opts" -t io-scenario $type --runner-extra-opts="ubuntu ceph.pem"
+
+# io_opts="--type $type -a write --iodepth 16 --blocksize 1m --iosize x20"
+# python run_test.py --runner rally -l -o "$io_opts" -t io-scenario $type --runner-extra-opts="--deployment perf-1"
diff --git a/ssh_copy_directory.py b/ssh_copy_directory.py
index 5843c18..d074fcf 100644
--- a/ssh_copy_directory.py
+++ b/ssh_copy_directory.py
@@ -69,7 +69,7 @@
def copy_paths(conn, paths):
- sftp = conn._client.open_sftp()
+ sftp = conn.open_sftp()
try:
for src, dst in paths.items():
try:
diff --git a/ssh_runner.py b/ssh_runner.py
new file mode 100644
index 0000000..76fc6b5
--- /dev/null
+++ b/ssh_runner.py
@@ -0,0 +1,116 @@
+import re
+import Queue
+import traceback
+import threading
+
+from utils import ssh_connect
+
+import itest
+from utils import get_barrier, log_error
+
+conn_uri_attrs = ("user", "passwd", "host", "port", "path")
+
+
+class ConnCreds(object):
+ def __init__(self):
+ for name in conn_uri_attrs:
+ setattr(self, name, None)
+
+
+uri_reg_exprs = []
+
+
+class URIsNamespace(object):
+ class ReParts(object):
+ user_rr = "[^:]*?"
+ host_rr = "[^:]*?"
+ port_rr = "\\d+"
+ key_file_rr = ".*"
+
+ re_dct = ReParts.__dict__
+
+ for attr_name, val in re_dct.items():
+ if attr_name.endswith('_rr'):
+ new_rr = "(?P<{0}>{1})".format(attr_name[:-3], val)
+ setattr(ReParts, attr_name, new_rr)
+
+ re_dct = ReParts.__dict__
+
+ templs = [
+ "^{user_rr}@{host_rr}::{key_file_rr}$"
+ ]
+
+ for templ in templs:
+ uri_reg_exprs.append(templ.format(**re_dct))
+
+
+def parse_ssh_uri(uri):
+ # user:passwd@ip_host:port
+ # user:passwd@ip_host
+ # user@ip_host:port
+ # user@ip_host
+ # ip_host:port
+ # ip_host
+ # user@ip_host:port:path_to_key_file
+ # user@ip_host::path_to_key_file
+ # ip_host:port:path_to_key_file
+ # ip_host::path_to_key_file
+
+ res = ConnCreds()
+
+ for rr in uri_reg_exprs:
+ rrm = re.match(rr, uri)
+ if rrm is not None:
+ res.__dict__.update(rrm.groupdict())
+ return res
+ raise ValueError("Can't parse {0!r} as ssh uri value".format(uri))
+
+
+def connect(uri):
+ creds = parse_ssh_uri(uri)
+ return ssh_connect(creds.host, creds.user, creds.key_file)
+
+
+def conn_func(obj, barrier, latest_start_time, conn):
+ try:
+ test_iter = itest.run_test_iter(obj, conn)
+ next(test_iter)
+
+ with log_error("!Run test"):
+ return next(test_iter)
+ except:
+ print traceback.format_exc()
+ raise
+
+
+def get_ssh_runner(uris,
+ latest_start_time=None,
+ keep_temp_files=False):
+
+ connections = [connect(uri) for uri in uris]
+ result_queue = Queue.Queue()
+ barrier = get_barrier(len(uris), threaded=True)
+
+ def closure(obj):
+ ths = []
+ obj.set_result_cb(result_queue.put)
+
+ params = (obj, barrier, latest_start_time)
+
+ for conn in connections:
+ th = threading.Thread(None, conn_func, None,
+ params + (conn,))
+ th.daemon = True
+ th.start()
+ ths.append(th)
+
+ for th in ths:
+ th.join()
+
+ test_result = []
+ while not result_queue.empty():
+ test_result.append(result_queue.get())
+
+ return test_result
+
+ return closure
diff --git a/scripts/starts_vms.py b/starts_vms.py
similarity index 62%
rename from scripts/starts_vms.py
rename to starts_vms.py
index b52c2d5..9a00b34 100644
--- a/scripts/starts_vms.py
+++ b/starts_vms.py
@@ -2,7 +2,8 @@
import os
import time
-import paramiko
+from concurrent.futures import ThreadPoolExecutor
+
from novaclient.client import Client as n_client
from cinderclient.v1.client import Client as c_client
@@ -25,10 +26,8 @@
return nova.keypairs.create(name, key.read())
-def create_volume(size, name=None, volid=[0]):
+def create_volume(size, name):
cinder = c_client(*ostack_get_creds())
- name = 'ceph-test-{0}'.format(volid[0])
- volid[0] = volid[0] + 1
vol = cinder.volumes.create(size=size, display_name=name)
err_count = 0
while vol.status != 'available':
@@ -67,83 +66,99 @@
server = nova.servers.get(server)
-def get_or_create_floating_ip(nova, pool, used_ip):
+class Allocate(object):
+ pass
+
+
+def get_floating_ips(nova, pool, amount):
ip_list = nova.floating_ips.list()
if pool is not None:
ip_list = [ip for ip in ip_list if ip.pool == pool]
- ip_list = [ip for ip in ip_list if ip.instance_id is None]
- ip_list = [ip for ip in ip_list if ip.ip not in used_ip]
+ return [ip for ip in ip_list if ip.instance_id is None][:amount]
- if len(ip_list) > 0:
- return ip_list[0]
+
+def create_vms_mt(nova, amount, keypair_name, img_name,
+ flavor_name, vol_sz=None, network_zone_name=None,
+ flt_ip_pool=None, name_templ='ceph-test-{}',
+ scheduler_hints=None):
+
+ if network_zone_name is not None:
+ network = nova.networks.find(label=network_zone_name)
+ nics = [{'net-id': network.id}]
else:
- return nova.floating_ips.create(pool)
+ nics = None
-
-def create_vms(nova, amount, keypair_name, img_name,
- flavor_name, vol_sz, network_zone_name=None):
-
- network = nova.networks.find(label=network_zone_name)
- nics = [{'net-id': network.id}]
fl = nova.flavors.find(name=flavor_name)
img = nova.images.find(name=img_name)
- srvs = []
- counter = 0
+ if flt_ip_pool is not None:
+ ips = get_floating_ips(nova, flt_ip_pool, amount)
+ ips += [Allocate] * (amount - len(ips))
+ else:
+ ips = [None] * amount
+
+ print "Try to start {0} servers".format(amount)
+ names = map(name_templ.format, range(amount))
+
+ with ThreadPoolExecutor(max_workers=16) as executor:
+ futures = []
+ for name, flt_ip in zip(names, ips):
+ params = (nova, name, keypair_name, img, fl,
+ nics, vol_sz, flt_ip, scheduler_hints,
+ flt_ip_pool)
+
+ futures.append(executor.submit(create_vm, *params))
+ return [future.result() for future in futures]
+
+
+def create_vm(nova, name, keypair_name, img,
+ fl, nics, vol_sz=None,
+ flt_ip=False,
+ scheduler_hints=None,
+ pool=None):
for i in range(3):
- amount_left = amount - len(srvs)
+ srv = nova.servers.create(name,
+ flavor=fl, image=img, nics=nics,
+ key_name=keypair_name,
+ scheduler_hints=scheduler_hints)
- new_srvs = []
- for i in range(amount_left):
- print "creating server"
- srv = nova.servers.create("ceph-test-{0}".format(counter),
- flavor=fl, image=img, nics=nics,
- key_name=keypair_name)
- counter += 1
- new_srvs.append(srv)
- print srv
+ if not wait_for_server_active(nova, srv):
+ msg = "Server {0} fails to start. Kill it and try again"
+ print msg.format(srv.name)
+ nova.servers.delete(srv)
- deleted_servers = []
- for srv in new_srvs:
- if not wait_for_server_active(nova, srv):
- print "Server", srv.name, "fails to start. Kill it and",
- print " try again"
+ while True:
+ print "wait till server deleted"
+ all_id = set(alive_srv.id for alive_srv in nova.servers.list())
+ if srv.id not in all_id:
+ break
+ time.sleep(1)
+ else:
+ break
- nova.servers.delete(srv)
- deleted_servers.append(srv)
- else:
- srvs.append(srv)
-
- if len(deleted_servers) != 0:
- time.sleep(5)
-
- if len(srvs) != amount:
- print "ERROR: can't start required amount of servers. Exit"
- raise RuntimeError("Fail to create {0} servers".format(amount))
-
- result = {}
- for srv in srvs:
- print "wait till server be ready"
- wait_for_server_active(nova, srv)
+ if vol_sz is not None:
print "creating volume"
- vol = create_volume(vol_sz)
+ vol = create_volume(vol_sz, name)
print "attach volume to server"
nova.volumes.create_server_volume(srv.id, vol.id, None)
- print "create floating ip"
- flt_ip = get_or_create_floating_ip(nova, 'net04_ext', result.keys())
+
+ if flt_ip is Allocate:
+ flt_ip = nova.floating_ips.create(pool)
+
+ if flt_ip is not None:
print "attaching ip to server"
srv.add_floating_ip(flt_ip)
- result[flt_ip.ip] = srv
-
- return result
+ return (flt_ip.ip, srv)
+ else:
+ return (None, srv)
-def clear_all(nova):
+def clear_all(nova, name_templ="ceph-test-{}"):
deleted_srvs = set()
for srv in nova.servers.list():
- if re.match(r"ceph-test-\d+", srv.name):
+ if re.match(name_templ.format("\\d+"), srv.name):
print "Deleting server", srv.name
nova.servers.delete(srv)
deleted_srvs.add(srv.id)
@@ -161,7 +176,7 @@
cinder = c_client(*ostack_get_creds())
for vol in cinder.volumes.list():
if isinstance(vol.display_name, basestring):
- if re.match(r'ceph-test-\d+', vol.display_name):
+ if re.match(name_templ.format("\\d+"), vol.display_name):
if vol.status in ('available', 'error'):
print "Deleting volume", vol.display_name
cinder.volumes.delete(vol)
@@ -169,23 +184,6 @@
print "Clearing done (yet some volumes may still deleting)"
-def wait_ssh_ready(host, user, key_file, retry_count=10, timeout=5):
- ssh = paramiko.SSHClient()
- ssh.load_host_keys('/dev/null')
- ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- ssh.known_hosts = None
-
- for i in range(retry_count):
- try:
- ssh.connect(host, username=user, key_filename=key_file,
- look_for_keys=False)
- break
- except:
- if i == retry_count - 1:
- raise
- time.sleep(timeout)
-
-
# def prepare_host(key_file, ip, fio_path, dst_fio_path, user='cirros'):
# print "Wait till ssh ready...."
# wait_ssh_ready(ip, user, key_file)
@@ -228,7 +226,7 @@
params['amount'] = amount
params['keypair_name'] = keypair_name
- for ip, host in create_vms(nova, **params).items():
+ for ip, host in create_vms(nova, **params):
ips.append(ip)
print "All setup done! Ips =", " ".join(ips)
diff --git a/storage_api.py b/storage_api.py
index 5578984..6f1b714 100644
--- a/storage_api.py
+++ b/storage_api.py
@@ -15,7 +15,7 @@
def __str__(self):
return self.build + " " + self.build_type + " " + \
- self.md5 + " " + str(self.results)
+ self.md5 + " " + str(self.results)
def create_storage(url, email=None, password=None):
diff --git a/utils.py b/utils.py
index 8941ca5..0a38ad5 100644
--- a/utils.py
+++ b/utils.py
@@ -1,9 +1,21 @@
+import time
+import threading
+import contextlib
import multiprocessing
+import paramiko
-def get_barrier(count):
- val = multiprocessing.Value('i', count)
- cond = multiprocessing.Condition()
+from log import log
+
+
+def get_barrier(count, threaded=False):
+ if threaded:
+ class val(object):
+ value = count
+ cond = threading.Condition()
+ else:
+ val = multiprocessing.Value('i', count)
+ cond = multiprocessing.Condition()
def closure(timeout):
with cond:
@@ -15,3 +27,62 @@
return val.value == 0
return closure
+
+
+def ssh_connect(host, user, key_file, retry_count=10, timeout=5):
+ ssh = paramiko.SSHClient()
+ ssh.load_host_keys('/dev/null')
+ ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ ssh.known_hosts = None
+
+ for i in range(retry_count):
+ try:
+ ssh.connect(host, username=user, key_filename=key_file,
+ look_for_keys=False)
+ return ssh
+ except:
+ if i == retry_count - 1:
+ raise
+ time.sleep(timeout)
+
+
+def wait_on_barrier(barrier, latest_start_time):
+ if barrier is not None:
+ if latest_start_time is not None:
+ timeout = latest_start_time - time.time()
+ else:
+ timeout = None
+
+ if timeout is not None and timeout > 0:
+ msg = "Ready and waiting on barrier. " + \
+ "Will wait at most {0} seconds"
+ log(msg.format(int(timeout)))
+
+ if not barrier(timeout):
+ log("Barrier timeouted")
+
+
+@contextlib.contextmanager
+def log_error(action, types=(Exception,)):
+ if not action.startswith("!"):
+ log("Starts : " + action)
+ else:
+ action = action[1:]
+
+ try:
+ yield
+ except Exception as exc:
+ if isinstance(exc, types) and not isinstance(exc, StopIteration):
+ templ = "Error during {0} stage: {1}"
+ log(templ.format(action, exc.message))
+ raise
+
+
+def run_over_ssh(conn, cmd):
+ "should be replaces by normal implementation, with select"
+
+ stdin, stdout, stderr = conn.exec_command(cmd)
+ out = stdout.read()
+ err = stderr.read()
+ code = stdout.channel.recv_exit_status()
+ return code, out, err