large refactoring, ready to move away from rally
diff --git a/TODO b/TODO
new file mode 100644
index 0000000..9f383d3
--- /dev/null
+++ b/TODO
@@ -0,0 +1,15 @@
+выкинуть все упоминания о googledoc
+нам нужен backlog
+я буду productowner и буду вносить тикеты в борду
+
+код сайты вынести из __init__.py в отдельный файл
+перенести конфиг линта из тестов
+пройти все линтом
+move ssize_to_kb to utils
+unittests
+перейти на нормальное логирование
+
+zmq/tcp client should immitate ssh
+rpyc???
+детектирует vm по локальным ip, которые получаем из nova
+roadmap написать
\ No newline at end of file
diff --git a/data.py b/data.py
deleted file mode 100644
index 5569616..0000000
--- a/data.py
+++ /dev/null
@@ -1,113 +0,0 @@
-import re
-import sys
-
-
-splitter_rr = "(?ms)=====+\n"
-
-
-def get_data_from_output(fname):
- results = {}
- fc = open(fname).read()
-
- for block in re.split(splitter_rr, fc):
- block = block.strip()
- if not block.startswith("[{u'__meta__':"):
- continue
- for val in eval(block):
- meta = val['__meta__']
- meta['sync'] = 's' if meta['sync'] else 'a'
- key = "{action} {sync} {blocksize}k".format(**meta)
- results.setdefault(key, []).append(val['bw_mean'])
-
- processed_res = {}
-
- for k, v in results.items():
- v.sort()
- med = float(sum(v)) / len(v)
- ran = sum(abs(x - med) for x in v) / len(v)
- processed_res[k] = (int(med), int(ran))
-
- return processed_res
-
-
-def ksort(x):
- op, sync, sz = x.split(" ")
- return (op, sync, int(sz[:-1]))
-
-
-def show_data(path1, path2=None):
- templ_1 = " {:>10} {:>5} {:>5} {:>6} ~ {:>5} {:>2}% {:>5}"
- templ_2 = templ_1 + " {:>6} ~ {:>5} {:>2}% {:>5} ---- {:>6}% "
-
- ln_1 = templ_1.replace("<", "^").replace(">", "^")
- ln_1 = ln_1.format("Oper", "Sync", "BSZ", "BW1", "DEV1", "%", "IOPS1")
-
- ln_2 = templ_2.replace("<", "^").replace(">", "^")
- ln_2 = ln_2.format("Oper", "Sync", "BSZ", "BW1", "DEV1", "%",
- "IOPS1", "BW2", "DEV2", "%", "IOPS2", "DIFF %")
-
- sep_1 = '-' * len(ln_1)
- sep_2 = '-' * len(ln_2)
-
- res_1 = get_data_from_output(path1)
-
- if path2 is None:
- res_2 = None
- sep = sep_1
- ln = ln_1
- templ = templ_1
- else:
- res_2 = get_data_from_output(path2)
- sep = sep_2
- ln = ln_2
- templ = templ_2
-
- print sep
- print ln
- print sep
-
- prev_tp = None
-
- common_keys = set(res_1.keys())
-
- if res_2 is not None:
- common_keys &= set(res_2.keys())
-
- for k in sorted(common_keys, key=ksort):
- tp = k.rsplit(" ", 1)[0]
- op, s, sz = k.split(" ")
- s = 'sync' if s == 's' else 'async'
-
- if tp != prev_tp and prev_tp is not None:
- print sep
-
- prev_tp = tp
-
- m1, d1 = res_1[k]
- iops1 = m1 / int(sz[:-1])
- perc1 = int(d1 * 100.0 / m1 + 0.5)
-
- if res_2 is not None:
- m2, d2 = res_2[k]
- iops2 = m2 / int(sz[:-1])
- perc2 = int(d2 * 100.0 / m2 + 0.5)
- avg_diff = int(((m2 - m1) * 100.) / m2 + 0.5)
-
- if res_2 is not None:
- print templ.format(op, s, sz, m1, d1, perc1, iops1,
- m2, d2, perc2, iops2, avg_diff)
- else:
- print templ.format(op, s, sz, m1, d1, perc1, iops1)
-
- print sep
-
-
-def main(argv):
- path1 = argv[0]
- path2 = argv[1] if len(argv) > 1 else None
- show_data(path1, path2)
- return 0
-
-if __name__ == "__main__":
- exit(main(sys.argv[1:]))
-# print " ", results[k]
diff --git a/data_extractor.py b/data_extractor.py
deleted file mode 100644
index d4e6ed1..0000000
--- a/data_extractor.py
+++ /dev/null
@@ -1,116 +0,0 @@
-import sys
-import json
-import sqlite3
-import contextlib
-
-
-def connect(url):
- return sqlite3.connect(url)
-
-
-create_db_sql_templ = """
-CREATE TABLE build (id integer primary key,
- build text,
- type text,
- md5 text);
-
-CREATE TABLE params_combination (id integer primary key, {params});
-
-CREATE TABLE result (build_id integer,
- params_combination integer,
- bandwith float,
- deviation float);
-"""
-
-
-PARAM_COUNT = 20
-
-
-def get_all_tables(conn):
- cursor = conn.cursor()
- cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
- return cursor.fetchall()
-
-
-def drop_database(conn):
- cursor = conn.cursor()
- cursor.execute("drop table result")
- cursor.execute("drop table params_combination")
- cursor.execute("drop table build")
-
-
-def init_database(conn):
- cursor = conn.cursor()
-
- params = ["param_{0} text".format(i) for i in range(PARAM_COUNT)]
- create_db_sql = create_db_sql_templ.format(params=",".join(params))
-
- for sql in create_db_sql.split(";"):
- cursor.execute(sql)
-
-
-def insert_build(cursor, build_id, build_type, iso_md5):
- cursor.execute("insert into build (build, type, md5) values (?, ?, ?)",
- (build_id, build_type, iso_md5))
- return cursor.lastrowid
-
-
-def insert_params(cursor, *param_vals):
- param_vals = param_vals + ("",) * (PARAM_COUNT - len(param_vals))
-
- params = ",".join(['?'] * PARAM_COUNT)
- select_templ = "select id from params_combination where {params_where}"
-
- params_where = ["param_{0}=?".format(i) for i in range(PARAM_COUNT)]
- req = select_templ.format(params_where=" AND ".join(params_where))
- cursor.execute(req, param_vals)
- res = cursor.fetchall()
- if [] != res:
- return res[0][0]
-
- params = ",".join(['?'] * PARAM_COUNT)
- param_insert_templ = "insert into params_combination ({0}) values ({1})"
- param_names = ",".join("param_{0}".format(i) for i in range(PARAM_COUNT))
- req = param_insert_templ.format(param_names, params)
- cursor.execute(req, param_vals)
- return cursor.lastrowid
-
-
-def insert_results(cursor, build_id, params_id, bw, dev):
- req = "insert into result values (?, ?, ?, ?)"
- cursor.execute(req, (build_id, params_id, bw, dev))
-
-
-@contextlib.contextmanager
-def transaction(conn):
- try:
- cursor = conn.cursor()
- yield cursor
- except:
- conn.rollback()
- raise
- else:
- conn.commit()
-
-
-def json_to_db(json_data, conn):
- data = json.loads(json_data)
- with transaction(conn) as cursor:
- for build_data in data:
- build_id = insert_build(cursor,
- build_data.pop("build_id"),
- build_data.pop("type"),
- build_data.pop("iso_md5"))
-
- for params, (bw, dev) in build_data.items():
- param_id = insert_params(cursor, *params.split(" "))
- insert_results(cursor, build_id, param_id, bw, dev)
-
-
-conn = sqlite3.connect(sys.argv[1])
-json_data = open(sys.argv[2]).read()
-
-if len(get_all_tables(conn)) == 0:
- init_database(conn)
-
-json_to_db(json_data, conn)
diff --git a/envs/cz-perf-1.json b/envs/cz-perf-1.json
new file mode 100644
index 0000000..ca6b54c
--- /dev/null
+++ b/envs/cz-perf-1.json
@@ -0,0 +1,12 @@
+{
+ "type": "ExistingCloud",
+ "auth_url": "http://172.16.53.2:5000/v2.0/",
+ "region_name": "RegionOne",
+ "endpoint_type": "public",
+ "endpoint": "http://172.16.53.2:35357/v2.0/",
+ "admin": {
+ "username": "admin",
+ "password": "admin",
+ "tenant_name": "admin"
+ }
+}
\ No newline at end of file
diff --git a/envs/cz-perf-2.json b/envs/cz-perf-2.json
new file mode 100644
index 0000000..81eb1e4
--- /dev/null
+++ b/envs/cz-perf-2.json
@@ -0,0 +1,12 @@
+{
+ "type": "ExistingCloud",
+ "auth_url": "http://172.16.53.66:5000/v2.0/",
+ "region_name": "RegionOne",
+ "endpoint_type": "public",
+ "endpoint": "http://172.16.53.66:35357/v2.0/",
+ "admin": {
+ "username": "admin",
+ "password": "admin",
+ "tenant_name": "admin"
+ }
+}
\ No newline at end of file
diff --git a/io_scenario/__init__.py b/io_scenario/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/io_scenario/__init__.py
diff --git a/io-scenario/fio b/io_scenario/fio
similarity index 100%
rename from io-scenario/fio
rename to io_scenario/fio
Binary files differ
diff --git a/io-scenario/io.py b/io_scenario/io.py
similarity index 92%
rename from io-scenario/io.py
rename to io_scenario/io.py
index 8a3509a..e1756f6 100644
--- a/io-scenario/io.py
+++ b/io_scenario/io.py
@@ -139,31 +139,13 @@
IOZoneParser.make_positions()
-def do_run_iozone(params, filename, timeout, iozone_path='iozone',
- microsecond_mode=False):
-
- PATTERN = "\x6d"
-
- cmd = [iozone_path, "-V", "109"]
-
- if params.sync:
- cmd.append('-o')
-
- if params.direct_io:
- cmd.append('-I')
-
- if microsecond_mode:
- cmd.append('-N')
-
+def iozone_do_prepare(params, filename, pattern):
all_files = []
threads = int(params.concurence)
if 1 != threads:
- cmd.extend(('-t', str(threads), '-F'))
filename = filename + "_{}"
- cmd.extend(filename % i for i in range(threads))
all_files.extend(filename % i for i in range(threads))
else:
- cmd.extend(('-f', filename))
all_files.append(filename)
bsz = 1024 if params.size > 1024 else params.size
@@ -175,12 +157,41 @@
for fname in all_files:
with open(fname, "wb") as fd:
if fsz > 1024:
- pattern = PATTERN * 1024 * 1024
+ pattern = pattern * 1024 * 1024
for _ in range(int(fsz / 1024) + 1):
fd.write(pattern)
else:
- fd.write(PATTERN * 1024 * fsz)
+ fd.write(pattern * 1024 * fsz)
fd.flush()
+ return all_files
+
+
+VERIFY_PATTERN = "\x6d"
+
+
+def do_run_iozone(params, filename, timeout, iozone_path='iozone',
+ microsecond_mode=False,
+ prepare_only=False):
+
+ cmd = [iozone_path, "-V", str(ord(VERIFY_PATTERN))]
+
+ if params.sync:
+ cmd.append('-o')
+
+ if params.direct_io:
+ cmd.append('-I')
+
+ if microsecond_mode:
+ cmd.append('-N')
+
+ all_files = iozone_do_prepare(params, filename, VERIFY_PATTERN)
+
+ threads = int(params.concurence)
+ if 1 != threads:
+ cmd.extend(('-t', str(threads), '-F'))
+ cmd.extend(all_files)
+ else:
+ cmd.extend(('-f', all_files[0]))
cmd.append('-i')
@@ -217,10 +228,6 @@
except:
raise
- # res['bw_dev'] = 0
- # res['bw_max'] = res["bw_mean"]
- # res['bw_min'] = res["bw_mean"]
-
return res, " ".join(cmd)
@@ -318,7 +325,10 @@
return json.loads(raw_out)["jobs"][0], " ".join(cmd_line)
-def run_fio(benchmark, fio_path, tmpname, timeout=None):
+def run_fio(benchmark, fio_path, tmpname, timeout=None, prepare_only=False):
+ if prepare_only:
+ return {}, ""
+
job_output, cmd_line = run_fio_once(benchmark, fio_path, tmpname, timeout)
if benchmark.action in ('write', 'randwrite'):
@@ -473,6 +483,9 @@
parser.add_argument(
"--binary-path", help="binary path",
default=None, dest='binary_path')
+ parser.add_argument(
+ "--prepare-only", default=False, dest='prepare_only',
+ action="store_true")
return parser.parse_args(argv)
@@ -526,31 +539,24 @@
res, cmd = run_benchmark(argv_obj.type,
benchmark,
binary_path,
- test_file_name)
- res['__meta__'] = benchmark.__dict__.copy()
- res['__meta__']['cmdline'] = cmd
- sys.stdout.write(json.dumps(res) + "\n")
+ test_file_name,
+ argv_obj.prepare_only)
+ if not argv_obj.prepare_only:
+ res['__meta__'] = benchmark.__dict__.copy()
+ res['__meta__']['cmdline'] = cmd
+
+ sys.stdout.write(json.dumps(res))
+
+ if not argv_obj.prepare_only:
+ sys.stdout.write("\n")
+
finally:
if remove_binary:
os.unlink(binary_path)
- if os.path.isfile(test_file_name):
+ if os.path.isfile(test_file_name) and not argv_obj.prepare_only:
os.unlink(test_file_name)
-# function-marker for patching, don't 'optimize' it
-def INSERT_TOOL_ARGS(x):
- return [x]
-
-
if __name__ == '__main__':
- # this line would be patched in case of run under rally
- # don't modify it!
- argvs = INSERT_TOOL_ARGS(sys.argv[1:])
- code = 0
- for argv in argvs:
- tcode = main(argv)
- if tcode != 0:
- code = tcode
-
- exit(code)
+ exit(main(sys.argv[1:]))
diff --git a/io-scenario/io.yaml b/io_scenario/io.yaml
similarity index 100%
rename from io-scenario/io.yaml
rename to io_scenario/io.yaml
diff --git a/io-scenario/iozone b/io_scenario/iozone
similarity index 100%
rename from io-scenario/iozone
rename to io_scenario/iozone
Binary files differ
diff --git a/itest.py b/itest.py
new file mode 100644
index 0000000..6457aa8
--- /dev/null
+++ b/itest.py
@@ -0,0 +1,69 @@
+import abc
+import json
+import types
+import os.path
+
+from io_scenario import io
+from ssh_copy_directory import copy_paths
+
+
+class IPerfTest(object):
+ def __init__(self, on_result_cb):
+ self.on_result_cb = on_result_cb
+
+ def build(self, conn):
+ self.pre_run(conn)
+
+ def pre_run(self, conn):
+ pass
+
+ @abc.abstractmethod
+ def run(self, conn):
+ pass
+
+
+def run_test_iter(obj, conn):
+ yield obj.pre_run(conn)
+ res = obj.run(conn)
+ if isinstance(res, types.GeneratorType):
+ for vl in res:
+ yield vl
+ else:
+ yield res
+
+
+class IOPerfTest(IPerfTest):
+ def __init__(self,
+ script_opts,
+ testtool_local,
+ on_result_cb,
+ keep_tmp_files):
+
+ IPerfTest.__init__(self, on_result_cb)
+ dst_testtool_path = '/tmp/io_tool'
+ self.files_to_copy = {testtool_local: dst_testtool_path}
+ self.script_opts = script_opts + ["--binary-path", dst_testtool_path]
+
+ def pre_run(self, conn):
+ copy_paths(self.files_to_copy)
+
+ def run(self, conn):
+ io_py = os.path.dirname(io.__file__)
+
+ if io_py.endswith('.pyc'):
+ io_py = io_py[:-1]
+
+ args = ['env', 'python2', io_py] + self.script_opts
+ code, out, err = conn.execute(args)
+ self.on_result(code, out, err)
+ return code, out, err
+
+ def on_result(self, code, out, err):
+ if 0 == code:
+ try:
+ for line in out.split("\n"):
+ if line.strip() != "":
+ self.on_result_cb(json.loads(line))
+ except Exception as err:
+ msg = "Error during postprocessing results: {0!r}".format(err)
+ raise RuntimeError(msg)
diff --git a/rally_runner.py b/rally_runner.py
new file mode 100644
index 0000000..cc95309
--- /dev/null
+++ b/rally_runner.py
@@ -0,0 +1,189 @@
+import os
+import json
+import time
+import yaml
+import warnings
+import functools
+import contextlib
+
+from rally import exceptions
+from rally.cmd import cliutils
+from rally.cmd.main import categories
+from rally.benchmark.scenarios.vm.utils import VMScenario
+from rally.benchmark.scenarios.vm.vmtasks import VMTasks
+
+import itest
+from utils import get_barrier
+
+
+def log(x):
+ pass
+
+
+@contextlib.contextmanager
+def patch_VMTasks_boot_runcommand_delete():
+
+ try:
+ orig = VMTasks.boot_runcommand_delete
+ except AttributeError:
+ # rally code was changed
+ log("VMTasks class was changed and have no boot_runcommand_delete"
+ " method anymore. Update patch code.")
+ raise exceptions.ScriptError("monkeypatch code fails on "
+ "VMTasks.boot_runcommand_delete")
+
+ @functools.wraps(orig)
+ def new_boot_runcommand_delete(self, *args, **kwargs):
+ if 'rally_affinity_group' in os.environ:
+ group_id = os.environ['rally_affinity_group']
+ kwargs['scheduler_hints'] = {'group': group_id}
+ return orig(self, *args, **kwargs)
+
+ VMTasks.boot_runcommand_delete = new_boot_runcommand_delete
+
+ try:
+ yield
+ finally:
+ VMTasks.boot_runcommand_delete = orig
+
+
+# should actually use mock module for this,
+# but don't wanna to add new dependency
+@contextlib.contextmanager
+def patch_VMScenario_run_command_over_ssh(test_obj,
+ barrier=None,
+ latest_start_time=None):
+
+ try:
+ orig = VMScenario.run_action
+ except AttributeError:
+ # rally code was changed
+ log("VMScenario class was changed and have no run_action"
+ " method anymore. Update patch code.")
+ raise exceptions.ScriptError("monkeypatch code fails on "
+ "VMScenario.run_action")
+
+ @functools.wraps(orig)
+ def closure(self, ssh, *args, **kwargs):
+ try:
+ ssh._client.open_sftp
+ except AttributeError:
+ # rally code was changed
+ log("Prototype of VMScenario.run_command_over_ssh "
+ "was changed. Update patch code.")
+ raise exceptions.ScriptError("monkeypatch code fails on "
+ "ssh._client.open_sftp()")
+
+ test_iter = itest.run_test_iter(test_obj, ssh)
+
+ next(test_iter)
+
+ log("Start io test")
+
+ if barrier is not None:
+ if latest_start_time is not None:
+ timeout = latest_start_time - time.time()
+ else:
+ timeout = None
+
+ if timeout is not None and timeout > 0:
+ msg = "Ready and waiting on barrier. " + \
+ "Will wait at most {0} seconds"
+ log(msg.format(int(timeout)))
+
+ if not barrier(timeout):
+ log("Barrier timeouted")
+
+ try:
+ code, out, err = next(test_iter)
+ except Exception as exc:
+ log("Rally raises exception {0}".format(exc.message))
+ raise
+
+ if 0 != code:
+ templ = "Script returns error! code={0}\n {1}"
+ log(templ.format(code, err.rstrip()))
+ else:
+ log("Test finished")
+
+ # result = {"rally": 0, "srally": 1}
+ result = {"rally": 0}
+ out = json.dumps(result)
+
+ return code, out, err
+
+ VMScenario.run_action = closure
+
+ try:
+ yield
+ finally:
+ VMScenario.run_action = orig
+
+
+def run_rally(rally_args):
+ return cliutils.run(['rally'] + rally_args, categories)
+
+
+def prepare_files(dst_testtool_path, files_dir):
+
+ # we do need temporary named files
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore")
+ yaml_file = os.tmpnam()
+
+ yaml_src_cont = open(os.path.join(files_dir, "io.yaml")).read()
+ task_params = yaml.load(yaml_src_cont)
+ rcd_params = task_params['VMTasks.boot_runcommand_delete']
+ rcd_params[0]['args']['script'] = os.path.join(files_dir, "io.py")
+ yaml_dst_cont = yaml.dump(task_params)
+
+ open(yaml_file, "w").write(yaml_dst_cont)
+
+ return yaml_file
+
+
+def run_tests_using_rally(obj,
+ files_dir,
+ testtool_py_args,
+ dst_testtool_path,
+ max_preparation_time,
+ rally_extra_opts,
+ keep_temp_files):
+
+ yaml_file, py_file = prepare_files(testtool_py_args,
+ dst_testtool_path,
+ files_dir)
+
+ try:
+ do_patch1 = patch_VMScenario_run_command_over_ssh
+ config = yaml.load(open(yaml_file).read())
+
+ vm_sec = 'VMTasks.boot_runcommand_delete'
+ concurrency = config[vm_sec][0]['runner']['concurrency']
+
+ barrier = get_barrier(concurrency)
+ max_release_time = time.time() + max_preparation_time
+
+ with patch_VMTasks_boot_runcommand_delete():
+ with do_patch1(obj, barrier, max_release_time):
+ opts = ['task', 'start', yaml_file] + list(rally_extra_opts)
+ log("Start rally with opts '{0}'".format(" ".join(opts)))
+ run_rally(opts)
+ finally:
+ if not keep_temp_files:
+ os.unlink(yaml_file)
+ os.unlink(py_file)
+
+
+def get_rally_runner(files_dir,
+ max_preparation_time,
+ rally_extra_opts,
+ keep_temp_files):
+
+ def closure(obj):
+ run_tests_using_rally(obj,
+ files_dir,
+ max_preparation_time,
+ rally_extra_opts,
+ keep_temp_files)
+ return closure
diff --git a/run_2.sh b/run_2.sh
deleted file mode 100644
index 566b1d5..0000000
--- a/run_2.sh
+++ /dev/null
@@ -1,43 +0,0 @@
-#!/bin/bash
-set -x
-set -e
-
-type="iozone"
-
-bsizes="1k 4k 64k 256k 1m"
-ops="randwrite write"
-osync="s a"
-three_times="1 2 3"
-
-for bsize in $bsizes ; do
- for op in $ops ; do
- for sync in $osync ; do
- for xxx in $three_times ; do
- if [[ "$sync" == "s" ]] ; then
- ssync="-s"
- factor="x500"
- else
- if [[ "$bsize" == "1k" || "$bsize" == "4k" ]] ; then
- continue
- fi
-
- ssync=
- factor="r2"
- fi
-
- python run_rally_test.py -l -o "--type $type -a $op --iodepth 16 --blocksize $bsize --iosize $factor $ssync" -t io-scenario $type --rally-extra-opts="--deployment perf-2"
- done
- done
- done
-done
-
-bsizes="4k 64k 256k 1m"
-ops="randread read"
-
-for bsize in $bsizes ; do
- for op in $ops ; do
- for xxx in $three_times ; do
- python run_rally_test.py -l -o "--type $type -a $op --iodepth 16 --blocksize $bsize --iosize r2" -t io-scenario $type --rally-extra-opts="--deployment perf-2"
- done
- done
-done
diff --git a/run_rally_test.py b/run_rally_test.py
index ffad0ff..5a4df58 100644
--- a/run_rally_test.py
+++ b/run_rally_test.py
@@ -1,25 +1,15 @@
import os
-import re
import sys
-import time
-import yaml
import json
import pprint
import os.path
import argparse
import datetime
-import warnings
-import functools
-import contextlib
import multiprocessing
-from rally import exceptions
-from rally.cmd import cliutils
-from rally.cmd.main import categories
-from rally.benchmark.scenarios.vm.utils import VMScenario
-from rally.benchmark.scenarios.vm.vmtasks import VMTasks
-
-from ssh_copy_directory import put_dir_recursively, ssh_copy_file
+import io_scenario
+import rally_runner
+from itest import IOPerfTest
def log(x):
@@ -28,229 +18,30 @@
sys.stderr.write(pref + x.replace("\n", "\n" + pref) + "\n")
-@contextlib.contextmanager
-def patch_VMTasks_boot_runcommand_delete():
+def run_io_test(tool,
+ script_args,
+ test_runner,
+ keep_temp_files=False):
- try:
- orig = VMTasks.boot_runcommand_delete
- except AttributeError:
- # rally code was changed
- log("VMTasks class was changed and have no boot_runcommand_delete"
- " method anymore. Update patch code.")
- raise exceptions.ScriptError("monkeypatch code fails on "
- "VMTasks.boot_runcommand_delete")
-
- @functools.wraps(orig)
- def new_boot_runcommand_delete(self, *args, **kwargs):
- if 'rally_affinity_group' in os.environ:
- group_id = os.environ['rally_affinity_group']
- kwargs['scheduler_hints'] = {'group': group_id}
- return orig(self, *args, **kwargs)
-
- VMTasks.boot_runcommand_delete = new_boot_runcommand_delete
-
- try:
- yield
- finally:
- VMTasks.boot_runcommand_delete = orig
-
-
-def get_barrier(count):
- val = multiprocessing.Value('i', count)
- cond = multiprocessing.Condition()
-
- def closure(timeout):
- me_released = False
- with cond:
- val.value -= 1
- if val.value == 0:
- me_released = True
- cond.notify_all()
- else:
- cond.wait(timeout)
- return val.value == 0
-
- if me_released:
- log("Test begins!")
-
- return closure
-
-
-# should actually use mock module for this,
-# but don't wanna to add new dependency
-@contextlib.contextmanager
-def patch_VMScenario_run_command_over_ssh(paths,
- on_result_cb,
- barrier=None,
- latest_start_time=None):
-
- try:
- orig = VMScenario.run_action
- except AttributeError:
- # rally code was changed
- log("VMScenario class was changed and have no run_action"
- " method anymore. Update patch code.")
- raise exceptions.ScriptError("monkeypatch code fails on "
- "VMScenario.run_action")
-
- @functools.wraps(orig)
- def closure(self, ssh, *args, **kwargs):
- try:
- sftp = ssh._client.open_sftp()
- except AttributeError:
- # rally code was changed
- log("Prototype of VMScenario.run_command_over_ssh "
- "was changed. Update patch code.")
- raise exceptions.ScriptError("monkeypatch code fails on "
- "ssh._client.open_sftp()")
- try:
- for src, dst in paths.items():
- try:
- if os.path.isfile(src):
- ssh_copy_file(sftp, src, dst)
- elif os.path.isdir(src):
- put_dir_recursively(sftp, src, dst)
- else:
- templ = "Can't copy {0!r} - " + \
- "it neither a file not a directory"
- msg = templ.format(src)
- log(msg)
- raise exceptions.ScriptError(msg)
- except exceptions.ScriptError:
- raise
- except Exception as exc:
- tmpl = "Scp {0!r} => {1!r} failed - {2!r}"
- msg = tmpl.format(src, dst, exc)
- log(msg)
- raise exceptions.ScriptError(msg)
- finally:
- sftp.close()
-
- log("Start io test")
-
- if barrier is not None:
- if latest_start_time is not None:
- timeout = latest_start_time - time.time()
- else:
- timeout = None
-
- if timeout is not None and timeout > 0:
- msg = "Ready and waiting on barrier. " + \
- "Will wait at most {0} seconds"
- log(msg.format(int(timeout)))
-
- if not barrier(timeout):
- log("Barrier timeouted")
-
- try:
- code, out, err = orig(self, ssh, *args, **kwargs)
- except Exception as exc:
- log("Rally raises exception {0}".format(exc.message))
- raise
-
- if 0 != code:
- templ = "Script returns error! code={0}\n {1}"
- log(templ.format(code, err.rstrip()))
- else:
- log("Test finished")
-
- try:
- for line in out.split("\n"):
- if line.strip() != "":
- result = json.loads(line)
- on_result_cb(result)
- except Exception as err:
- log("Error during postprocessing results: {0!r}".format(err))
-
- # result = {"rally": 0, "srally": 1}
- result = {"rally": 0}
- out = json.dumps(result)
-
- return code, out, err
-
- VMScenario.run_action = closure
-
- try:
- yield
- finally:
- VMScenario.run_action = orig
-
-
-def run_rally(rally_args):
- return cliutils.run(['rally'] + rally_args, categories)
-
-
-def prepare_files(testtool_py_args_v, dst_testtool_path, files_dir):
-
- # we do need temporary named files
- with warnings.catch_warnings():
- warnings.simplefilter("ignore")
- py_file = os.tmpnam()
- yaml_file = os.tmpnam()
-
- testtool_py_inp_path = os.path.join(files_dir, "io.py")
- py_src_cont = open(testtool_py_inp_path).read()
- args_repl_rr = r'INSERT_TOOL_ARGS\(sys\.argv.*?\)'
- py_dst_cont = re.sub(args_repl_rr, repr(testtool_py_args_v), py_src_cont)
-
- if py_dst_cont == args_repl_rr:
- templ = "Can't find replace marker in file {0}"
- msg = templ.format(testtool_py_inp_path)
- log(msg)
- raise ValueError(msg)
-
- yaml_src_cont = open(os.path.join(files_dir, "io.yaml")).read()
- task_params = yaml.load(yaml_src_cont)
- rcd_params = task_params['VMTasks.boot_runcommand_delete']
- rcd_params[0]['args']['script'] = py_file
- yaml_dst_cont = yaml.dump(task_params)
-
- open(py_file, "w").write(py_dst_cont)
- open(yaml_file, "w").write(yaml_dst_cont)
-
- return yaml_file, py_file
-
-
-def run_test(tool, testtool_py_args_v, dst_testtool_path, files_dir,
- rally_extra_opts, max_preparation_time=300,
- keet_temp_files=False):
+ files_dir = os.path.dirname(io_scenario.__file__)
path = 'iozone' if 'iozone' == tool else 'fio'
- testtool_local = os.path.join(files_dir, path)
- yaml_file, py_file = prepare_files(testtool_py_args_v,
- dst_testtool_path,
- files_dir)
- try:
- config = yaml.load(open(yaml_file).read())
+ src_testtool_path = os.path.join(files_dir, path)
- vm_sec = 'VMTasks.boot_runcommand_delete'
- concurrency = config[vm_sec][0]['runner']['concurrency']
- copy_files = {testtool_local: dst_testtool_path}
+ result_queue = multiprocessing.Queue()
- result_queue = multiprocessing.Queue()
- results_cb = result_queue.put
+ obj = IOPerfTest(script_args,
+ src_testtool_path,
+ result_queue.put,
+ keep_temp_files)
- do_patch1 = patch_VMScenario_run_command_over_ssh
+ test_runner(obj)
- barrier = get_barrier(concurrency)
- max_release_time = time.time() + max_preparation_time
+ test_result = []
+ while not result_queue.empty():
+ test_result.append(result_queue.get())
- with patch_VMTasks_boot_runcommand_delete():
- with do_patch1(copy_files, results_cb, barrier, max_release_time):
- opts = ['task', 'start', yaml_file] + list(rally_extra_opts)
- log("Start rally with opts '{0}'".format(" ".join(opts)))
- run_rally(opts)
-
- rally_result = []
- while not result_queue.empty():
- rally_result.append(result_queue.get())
-
- return rally_result
-
- finally:
- if not keet_temp_files:
- os.unlink(yaml_file)
- os.unlink(py_file)
+ return test_result
def parse_args(argv):
@@ -262,7 +53,7 @@
action='store_true', default=False,
help="print some extra log info")
parser.add_argument("-o", "--io-opts", dest='io_opts',
- nargs="*", default=[],
+ required=True,
help="cmd line options for io.py")
parser.add_argument("-t", "--test-directory", help="directory with test",
dest="test_directory", required=True)
@@ -270,7 +61,7 @@
type=int, dest="max_preparation_time")
parser.add_argument("-k", "--keep", default=False,
help="keep temporary files",
- dest="keet_temp_files", action='store_true')
+ dest="keep_temp_files", action='store_true')
parser.add_argument("--rally-extra-opts", dest="rally_extra_opts",
default="", help="rally extra options")
@@ -279,7 +70,6 @@
def main(argv):
opts = parse_args(argv)
- dst_testtool_path = '/tmp/io_tool'
if not opts.extra_logs:
global log
@@ -288,43 +78,23 @@
pass
log = nolog
-
- if opts.io_opts == []:
- testtool_py_args_v = []
-
- block_sizes = ["4k", "64k"]
- ops = ['randwrite']
- iodepths = ['8']
- syncs = [True]
-
- for block_size in block_sizes:
- for op in ops:
- for iodepth in iodepths:
- for sync in syncs:
- tt_argv = ['--type', opts.tool_type,
- '-a', op,
- '--iodepth', iodepth,
- '--blocksize', block_size,
- '--iosize', '20M']
- if sync:
- tt_argv.append('-s')
- testtool_py_args_v.append(tt_argv)
else:
- testtool_py_args_v = []
- for o in opts.io_opts:
- ttopts = [opt.strip() for opt in o.split(" ") if opt.strip() != ""]
- testtool_py_args_v.append(ttopts)
+ rally_runner.log = log
- for io_argv_list in testtool_py_args_v:
- io_argv_list.extend(['--binary-path', dst_testtool_path])
+ script_args = [opt.strip()
+ for opt in opts.io_opts.split(" ")
+ if opt.strip() != ""]
- res = run_test(opts.tool_type,
- testtool_py_args_v,
- dst_testtool_path,
- files_dir=opts.test_directory,
- rally_extra_opts=opts.rally_extra_opts.split(" "),
- max_preparation_time=opts.max_preparation_time,
- keet_temp_files=opts.keet_temp_files)
+ runner = rally_runner.get_rally_runner(
+ files_dir=os.path.dirname(io_scenario.__file__),
+ rally_extra_opts=opts.rally_extra_opts.split(" "),
+ max_preparation_time=opts.max_preparation_time,
+ keep_temp_files=opts.keep_temp_files)
+
+ res = run_io_test(opts.tool_type,
+ script_args,
+ runner,
+ opts.keep_temp_files)
print "=" * 80
print pprint.pformat(res)
@@ -352,14 +122,16 @@
return 0
-# ubuntu cloud image
-# https://cloud-images.ubuntu.com/trusty/current/trusty-server-cloudimg-amd64-disk1.img
-# glance image-create --name 'ubuntu' --disk-format qcow2
-# --container-format bare --is-public true --copy-from
-# https://cloud-images.ubuntu.com/trusty/current/trusty-server-cloudimg-amd64-disk1.img
-# nova flavor-create ceph.512 ceph.512 512 50 1
-# nova server-group-create --policy anti-affinity ceph
+ostack_prepare = """
+glance image-create --name 'ubuntu' --disk-format qcow2
+--container-format bare --is-public true --copy-from
+https://cloud-images.ubuntu.com/trusty/current/trusty-server-cloudimg-amd64-disk1.img
+
+nova flavor-create ceph.512 ceph.512 512 50 1
+nova server-group-create --policy anti-affinity ceph
+"""
+
if __name__ == '__main__':
exit(main(sys.argv[1:]))
diff --git a/scripts/data.py b/scripts/data.py
new file mode 100644
index 0000000..dfe97a2
--- /dev/null
+++ b/scripts/data.py
@@ -0,0 +1,115 @@
+import re
+import sys
+import json
+
+splitter_rr = "(?ms)=====+\n"
+
+
+def get_data_from_output(fname):
+ results = {}
+ fc = open(fname).read()
+
+ for block in re.split(splitter_rr, fc):
+ block = block.strip()
+ if not block.startswith("[{u'__meta__':"):
+ continue
+ for val in eval(block):
+ meta = val['__meta__']
+ meta['sync'] = 's' if meta['sync'] else 'a'
+ key = "{action} {sync} {blocksize}k".format(**meta)
+ results.setdefault(key, []).append(val['bw_mean'])
+
+ processed_res = {}
+
+ for k, v in results.items():
+ v.sort()
+ med = float(sum(v)) / len(v)
+ ran = sum(abs(x - med) for x in v) / len(v)
+ processed_res[k] = (int(med), int(ran))
+
+ return meta, processed_res
+
+
+def ksort(x):
+ op, sync, sz = x.split(" ")
+ return (op, sync, int(sz[:-1]))
+
+
+def create_json_results(meta, file_data):
+ row = {"build_id": "",
+ "type": "",
+ "iso_md5": ""}
+ row.update(file_data)
+ return json.dumps(row)
+
+
+def show_data(*pathes):
+ begin = "| {:>10} {:>5} {:>5}"
+ first_file_templ = " | {:>6} ~ {:>5} {:>2}% {:>5}"
+ other_file_templ = " | {:>6} ~ {:>5} {:>2}% {:>5} ---- {:>6}%"
+
+ line_templ = begin + first_file_templ + \
+ other_file_templ * (len(pathes) - 1) + " |"
+
+ header_ln = line_templ.replace("<", "^").replace(">", "^")
+
+ params = ["Oper", "Sync", "BSZ", "BW1", "DEV1", "%", "IOPS1"]
+ for pos in range(1, len(pathes)):
+ params += "BW{0}+DEV{0}+%+IOPS{0}+DIFF %".format(pos).split("+")
+
+ header_ln = header_ln.format(*params)
+
+ sep = '-' * len(header_ln)
+
+ results = [get_data_from_output(path)[1] for path in pathes]
+
+ print sep
+ print header_ln
+ print sep
+
+ prev_tp = None
+
+ common_keys = set(results[0].keys())
+ for result in results[1:]:
+ common_keys &= set(result.keys())
+
+ for k in sorted(common_keys, key=ksort):
+ tp = k.rsplit(" ", 1)[0]
+ op, s, sz = k.split(" ")
+ s = 'sync' if s == 's' else 'async'
+
+ if tp != prev_tp and prev_tp is not None:
+ print sep
+
+ prev_tp = tp
+
+ results0 = results[0]
+ m0, d0 = results0[k]
+ iops0 = m0 / int(sz[:-1])
+ perc0 = int(d0 * 100.0 / m0 + 0.5)
+
+ data = [op, s, sz, m0, d0, perc0, iops0]
+
+ for result in results[1:]:
+ m, d = result[k]
+ iops = m / int(sz[:-1])
+ perc = int(d * 100.0 / m + 0.5)
+ avg_diff = int(((m - m0) * 100.) / m + 0.5)
+ data.extend([m, d, perc, iops, avg_diff])
+
+ print line_templ.format(*data)
+
+ print sep
+
+
+def main(argv):
+ path1 = argv[0]
+ if path1 == '--json':
+ print create_json_results(*get_data_from_output(argv[1]))
+ else:
+ show_data(*argv)
+ return 0
+
+if __name__ == "__main__":
+ exit(main(sys.argv[1:]))
+# print " ", results[k]
diff --git a/scripts/data_extractor.py b/scripts/data_extractor.py
new file mode 100644
index 0000000..d655f17
--- /dev/null
+++ b/scripts/data_extractor.py
@@ -0,0 +1,196 @@
+import sys
+import json
+import sqlite3
+import contextlib
+
+
+def connect(url):
+ return sqlite3.connect(url)
+
+
+create_db_sql_templ = """
+CREATE TABLE build (id integer primary key,
+ build text,
+ type text,
+ md5 text);
+
+CREATE TABLE params_combination (id integer primary key, {params});
+CREATE TABLE param (id integer primary key, name text, type text);
+
+CREATE TABLE result (build_id integer,
+ params_combination integer,
+ bandwith float,
+ deviation float);
+"""
+
+
+PARAM_COUNT = 20
+
+
+def get_all_tables(conn):
+ cursor = conn.cursor()
+ cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
+ return cursor.fetchall()
+
+
+def drop_database(conn):
+ cursor = conn.cursor()
+ cursor.execute("drop table result")
+ cursor.execute("drop table params_combination")
+ cursor.execute("drop table build")
+ cursor.execute("drop table param")
+
+
+def init_database(conn):
+ cursor = conn.cursor()
+
+ params = ["param_{0} text".format(i) for i in range(PARAM_COUNT)]
+ create_db_sql = create_db_sql_templ.format(params=",".join(params))
+
+ for sql in create_db_sql.split(";"):
+ cursor.execute(sql)
+
+
+def insert_io_params(conn):
+ sql = """insert into param (name, type) values ('operation',
+ '{write,randwrite,read,randread}');
+ insert into param (name, type) values ('sync', '{a,s}');
+ insert into param (name, type) values ('block_size', 'size_kmg');
+ """
+
+ for insert in sql.split(";"):
+ conn.execute(insert)
+
+
+def insert_build(cursor, build_id, build_type, iso_md5):
+ cursor.execute("insert into build (build, type, md5) values (?, ?, ?)",
+ (build_id, build_type, iso_md5))
+ return cursor.lastrowid
+
+
+def insert_params(cursor, *param_vals):
+ param_vals = param_vals + ("",) * (PARAM_COUNT - len(param_vals))
+
+ params = ",".join(['?'] * PARAM_COUNT)
+ select_templ = "select id from params_combination where {params_where}"
+
+ params_where = ["param_{0}=?".format(i) for i in range(PARAM_COUNT)]
+ req = select_templ.format(params_where=" AND ".join(params_where))
+ cursor.execute(req, param_vals)
+ res = cursor.fetchall()
+ if [] != res:
+ return res[0][0]
+
+ params = ",".join(['?'] * PARAM_COUNT)
+ param_insert_templ = "insert into params_combination ({0}) values ({1})"
+ param_names = ",".join("param_{0}".format(i) for i in range(PARAM_COUNT))
+ req = param_insert_templ.format(param_names, params)
+ cursor.execute(req, param_vals)
+ return cursor.lastrowid
+
+
+def insert_results(cursor, build_id, params_id, bw, dev):
+ req = "insert into result values (?, ?, ?, ?)"
+ cursor.execute(req, (build_id, params_id, bw, dev))
+
+
+@contextlib.contextmanager
+def transaction(conn):
+ try:
+ cursor = conn.cursor()
+ yield cursor
+ except:
+ conn.rollback()
+ raise
+ else:
+ conn.commit()
+
+
+def json_to_db(json_data, conn):
+ data = json.loads(json_data)
+ with transaction(conn) as cursor:
+ for build_data in data:
+ build_id = insert_build(cursor,
+ build_data.pop("build_id"),
+ build_data.pop("type"),
+ build_data.pop("iso_md5"))
+
+ for params, (bw, dev) in build_data.items():
+ param_id = insert_params(cursor, *params.split(" "))
+ insert_results(cursor, build_id, param_id, bw, dev)
+
+
+def to_db():
+ conn = sqlite3.connect(sys.argv[1])
+ json_data = open(sys.argv[2]).read()
+
+ if len(get_all_tables(conn)) == 0:
+ init_database(conn)
+
+ json_to_db(json_data, conn)
+
+
+def ssize_to_kb(ssize):
+ try:
+ smap = dict(k=1, K=1, M=1024, m=1024, G=1024**2, g=1024**2)
+ for ext, coef in smap.items():
+ if ssize.endswith(ext):
+ return int(ssize[:-1]) * coef
+
+ if int(ssize) % 1024 != 0:
+ raise ValueError()
+
+ return int(ssize) / 1024
+
+ except (ValueError, TypeError, AttributeError):
+ tmpl = "Unknow size format {0!r} (or size not multiples 1024)"
+ raise ValueError(tmpl.format(ssize))
+
+
+def load_slice(cursor, build_id, y_param, **params):
+ params_id = {}
+ for param in list(params) + [y_param]:
+ cursor.execute("select id from param where name=?", (param,))
+ params_id[param] = cursor.fetchone()
+
+ sql = """select params_combination.param_{0}, result.bandwith
+ from params_combination, result
+ where result.build_id=?""".format(params_id[y_param])
+
+ for param, val in params.items():
+ pid = params_id[param]
+ sql += " and params_combination.param_{0}='{1}'".format(pid, val)
+
+ cursor.execute(sql)
+
+
+def from_db():
+ conn = sqlite3.connect(sys.argv[1])
+ # sql = sys.argv[2]
+ cursor = conn.cursor()
+
+ sql = """select params_combination.param_2, result.bandwith
+ from params_combination, result
+ where params_combination.param_0="write"
+ and params_combination.param_1="s"
+ and params_combination.id=result.params_combination
+ and result.build_id=60"""
+
+ cursor.execute(sql)
+ data = []
+
+ for (sz, bw) in cursor.fetchall():
+ data.append((ssize_to_kb(sz), sz, bw))
+
+ data.sort()
+
+ import matplotlib.pyplot as plt
+ xvals = range(len(data))
+ plt.plot(xvals, [dt[2] for dt in data])
+ plt.ylabel('bandwith')
+ plt.xlabel('block size')
+ plt.xticks(xvals, [dt[1] for dt in data])
+ plt.show()
+
+
+from_db()
diff --git a/data_generator.py b/scripts/data_generator.py
similarity index 100%
rename from data_generator.py
rename to scripts/data_generator.py
diff --git a/run.sh b/scripts/run.sh
old mode 100755
new mode 100644
similarity index 100%
rename from run.sh
rename to scripts/run.sh
diff --git a/scripts/run_2.sh b/scripts/run_2.sh
new file mode 100644
index 0000000..7bd92fb
--- /dev/null
+++ b/scripts/run_2.sh
@@ -0,0 +1,8 @@
+#!/bin/bash
+set -x
+set -e
+
+type="iozone"
+
+io_opts="--type $type -a write --iodepth 16 --blocksize 1m --iosize x20"
+python run_rally_test.py -l -o "$io_opts" -t io-scenario $type --rally-extra-opts="--deployment $1"
diff --git a/run_tests.sh b/scripts/run_tests.sh
similarity index 100%
rename from run_tests.sh
rename to scripts/run_tests.sh
diff --git a/scripts/starts_vms.py b/scripts/starts_vms.py
new file mode 100644
index 0000000..b52c2d5
--- /dev/null
+++ b/scripts/starts_vms.py
@@ -0,0 +1,240 @@
+import re
+import os
+import time
+
+import paramiko
+from novaclient.client import Client as n_client
+from cinderclient.v1.client import Client as c_client
+
+
+def ostack_get_creds():
+ env = os.environ.get
+ name = env('OS_USERNAME')
+ passwd = env('OS_PASSWORD')
+ tenant = env('OS_TENANT_NAME')
+ auth_url = env('OS_AUTH_URL')
+ return name, passwd, tenant, auth_url
+
+
+def nova_connect():
+ return n_client('1.1', *ostack_get_creds())
+
+
+def create_keypair(nova, name, key_path):
+ with open(key_path) as key:
+ return nova.keypairs.create(name, key.read())
+
+
+def create_volume(size, name=None, volid=[0]):
+ cinder = c_client(*ostack_get_creds())
+ name = 'ceph-test-{0}'.format(volid[0])
+ volid[0] = volid[0] + 1
+ vol = cinder.volumes.create(size=size, display_name=name)
+ err_count = 0
+ while vol.status != 'available':
+ if vol.status == 'error':
+ if err_count == 3:
+ print "Fail to create volume"
+ raise RuntimeError("Fail to create volume")
+ else:
+ err_count += 1
+ cinder.volumes.delete(vol)
+ time.sleep(1)
+ vol = cinder.volumes.create(size=size, display_name=name)
+ continue
+ time.sleep(1)
+ vol = cinder.volumes.get(vol.id)
+ return vol
+
+
+def wait_for_server_active(nova, server, timeout=240):
+ t = time.time()
+ while True:
+ time.sleep(5)
+ sstate = getattr(server, 'OS-EXT-STS:vm_state').lower()
+
+ if sstate == 'active':
+ return True
+
+ print "Curr state is", sstate, "waiting for active"
+
+ if sstate == 'error':
+ return False
+
+ if time.time() - t > timeout:
+ return False
+
+ server = nova.servers.get(server)
+
+
+def get_or_create_floating_ip(nova, pool, used_ip):
+ ip_list = nova.floating_ips.list()
+
+ if pool is not None:
+ ip_list = [ip for ip in ip_list if ip.pool == pool]
+
+ ip_list = [ip for ip in ip_list if ip.instance_id is None]
+ ip_list = [ip for ip in ip_list if ip.ip not in used_ip]
+
+ if len(ip_list) > 0:
+ return ip_list[0]
+ else:
+ return nova.floating_ips.create(pool)
+
+
+def create_vms(nova, amount, keypair_name, img_name,
+ flavor_name, vol_sz, network_zone_name=None):
+
+ network = nova.networks.find(label=network_zone_name)
+ nics = [{'net-id': network.id}]
+ fl = nova.flavors.find(name=flavor_name)
+ img = nova.images.find(name=img_name)
+ srvs = []
+ counter = 0
+
+ for i in range(3):
+ amount_left = amount - len(srvs)
+
+ new_srvs = []
+ for i in range(amount_left):
+ print "creating server"
+ srv = nova.servers.create("ceph-test-{0}".format(counter),
+ flavor=fl, image=img, nics=nics,
+ key_name=keypair_name)
+ counter += 1
+ new_srvs.append(srv)
+ print srv
+
+ deleted_servers = []
+ for srv in new_srvs:
+ if not wait_for_server_active(nova, srv):
+ print "Server", srv.name, "fails to start. Kill it and",
+ print " try again"
+
+ nova.servers.delete(srv)
+ deleted_servers.append(srv)
+ else:
+ srvs.append(srv)
+
+ if len(deleted_servers) != 0:
+ time.sleep(5)
+
+ if len(srvs) != amount:
+ print "ERROR: can't start required amount of servers. Exit"
+ raise RuntimeError("Fail to create {0} servers".format(amount))
+
+ result = {}
+ for srv in srvs:
+ print "wait till server be ready"
+ wait_for_server_active(nova, srv)
+ print "creating volume"
+ vol = create_volume(vol_sz)
+ print "attach volume to server"
+ nova.volumes.create_server_volume(srv.id, vol.id, None)
+ print "create floating ip"
+ flt_ip = get_or_create_floating_ip(nova, 'net04_ext', result.keys())
+ print "attaching ip to server"
+ srv.add_floating_ip(flt_ip)
+ result[flt_ip.ip] = srv
+
+ return result
+
+
+def clear_all(nova):
+ deleted_srvs = set()
+ for srv in nova.servers.list():
+ if re.match(r"ceph-test-\d+", srv.name):
+ print "Deleting server", srv.name
+ nova.servers.delete(srv)
+ deleted_srvs.add(srv.id)
+
+ while deleted_srvs != set():
+ print "Waiting till all servers are actually deleted"
+ all_id = set(srv.id for srv in nova.servers.list())
+ if all_id.intersection(deleted_srvs) == set():
+ print "Done, deleting volumes"
+ break
+ time.sleep(1)
+
+ # wait till vm actually deleted
+
+ cinder = c_client(*ostack_get_creds())
+ for vol in cinder.volumes.list():
+ if isinstance(vol.display_name, basestring):
+ if re.match(r'ceph-test-\d+', vol.display_name):
+ if vol.status in ('available', 'error'):
+ print "Deleting volume", vol.display_name
+ cinder.volumes.delete(vol)
+
+ print "Clearing done (yet some volumes may still deleting)"
+
+
+def wait_ssh_ready(host, user, key_file, retry_count=10, timeout=5):
+ ssh = paramiko.SSHClient()
+ ssh.load_host_keys('/dev/null')
+ ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ ssh.known_hosts = None
+
+ for i in range(retry_count):
+ try:
+ ssh.connect(host, username=user, key_filename=key_file,
+ look_for_keys=False)
+ break
+ except:
+ if i == retry_count - 1:
+ raise
+ time.sleep(timeout)
+
+
+# def prepare_host(key_file, ip, fio_path, dst_fio_path, user='cirros'):
+# print "Wait till ssh ready...."
+# wait_ssh_ready(ip, user, key_file)
+
+# print "Preparing host >"
+# print " Coping fio"
+# copy_fio(key_file, ip, fio_path, user, dst_fio_path)
+
+# key_opts = '-o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no'
+# args = (key_file, user, ip, key_opts)
+# cmd_format = "ssh {3} -i {0} {1}@{2} '{{0}}'".format(*args).format
+
+# def exec_on_host(cmd):
+# print " " + cmd
+# subprocess.check_call(cmd_format(cmd), shell=True)
+
+# exec_on_host("sudo /usr/sbin/mkfs.ext4 /dev/vdb")
+# exec_on_host("sudo /bin/mkdir /media/ceph")
+# exec_on_host("sudo /bin/mount /dev/vdb /media/ceph")
+# exec_on_host("sudo /bin/chmod a+rwx /media/ceph")
+
+
+def main():
+ image_name = 'TestVM'
+ flavor_name = 'ceph'
+ vol_sz = 50
+ network_zone_name = 'net04'
+ amount = 10
+ keypair_name = 'ceph-test'
+
+ nova = nova_connect()
+ clear_all(nova)
+
+ try:
+ ips = []
+ params = dict(vol_sz=vol_sz)
+ params['image_name'] = image_name
+ params['flavor_name'] = flavor_name
+ params['network_zone_name'] = network_zone_name
+ params['amount'] = amount
+ params['keypair_name'] = keypair_name
+
+ for ip, host in create_vms(nova, **params).items():
+ ips.append(ip)
+
+ print "All setup done! Ips =", " ".join(ips)
+ print "Starting tests"
+ finally:
+ clear_all(nova)
+
+if __name__ == "__main__":
+ exit(main())
diff --git a/ssh_copy_directory.py b/ssh_copy_directory.py
index dca0843..d074fcf 100644
--- a/ssh_copy_directory.py
+++ b/ssh_copy_directory.py
@@ -66,3 +66,25 @@
remfile = os.path.join(remroot, f)
localfile = os.path.join(root, f)
ssh_copy_file(sftp, localfile, remfile, preserve_perm)
+
+
+def copy_paths(conn, paths):
+ sftp = conn.open_sftp()
+ try:
+ for src, dst in paths.items():
+ try:
+ if os.path.isfile(src):
+ ssh_copy_file(sftp, src, dst)
+ elif os.path.isdir(src):
+ put_dir_recursively(sftp, src, dst)
+ else:
+ templ = "Can't copy {0!r} - " + \
+ "it neither a file not a directory"
+ msg = templ.format(src)
+ raise OSError(msg)
+ except Exception as exc:
+ tmpl = "Scp {0!r} => {1!r} failed - {2!r}"
+ msg = tmpl.format(src, dst, exc)
+ raise OSError(msg)
+ finally:
+ sftp.close()
diff --git a/utils.py b/utils.py
new file mode 100644
index 0000000..8941ca5
--- /dev/null
+++ b/utils.py
@@ -0,0 +1,17 @@
+import multiprocessing
+
+
+def get_barrier(count):
+ val = multiprocessing.Value('i', count)
+ cond = multiprocessing.Condition()
+
+ def closure(timeout):
+ with cond:
+ val.value -= 1
+ if val.value == 0:
+ cond.notify_all()
+ else:
+ cond.wait(timeout)
+ return val.value == 0
+
+ return closure