2.0 ready
diff --git a/scripts/prepare.sh b/scripts/prepare.sh
index e8813f1..1f47f20 100644
--- a/scripts/prepare.sh
+++ b/scripts/prepare.sh
@@ -34,12 +34,14 @@
fi
done
- echo -n "Looking for keypair $KEYPAIR_NAME ... "
- export keypair_id=$(nova keypair-list | grep " $KEYPAIR_NAME " | awk '{print $2}' )
- if [ ! -z "$keypair_id" ] ; then
- echo " Found"
- else
- echo " Not Found"
+ if [ ! -z "$KEYPAIR_NAME" ] ; then
+ echo -n "Looking for keypair $KEYPAIR_NAME ... "
+ export keypair_id=$(nova keypair-list | grep " $KEYPAIR_NAME " | awk '{print $2}' )
+ if [ ! -z "$keypair_id" ] ; then
+ echo " Found"
+ else
+ echo " Not Found"
+ fi
fi
echo -n "Looking for security group $SECGROUP ... "
@@ -97,7 +99,7 @@
IMAGE_FILE="/tmp/${IMAGE_NAME}.qcow"
if [ ! -f "$IMAGE_FILE" ] ; then
- curl "$IMAGE_URL" -o "$IMAGE_FILE" >/dev/null
+ curl "$IMAGE_URL" -o "$IMAGE_FILE" 2>&1 >/dev/null
fi
opts="--disk-format qcow2 --container-format bare --is-public true"
glance image-create --name "$IMAGE_NAME" $opts --file "$IMAGE_FILE" >/dev/null
@@ -116,10 +118,12 @@
export groups_ids="$groups_ids $group_id"
done
- if [ -z "$keypair_id" ] ; then
- echo "Creating server group $SERV_GROUP. Key would be stored into $KEY_FILE_NAME"
- nova keypair-add "$KEYPAIR_NAME" > "$KEY_FILE_NAME"
- chmod og= "$KEY_FILE_NAME"
+ if [ ! -z "$KEYPAIR_NAME" ] ; then
+ if [ -z "$keypair_id" ] ; then
+ echo "Creating server group $SERV_GROUP. Key would be stored into $KEY_FILE_NAME"
+ nova keypair-add "$KEYPAIR_NAME" > "$KEY_FILE_NAME"
+ chmod og= "$KEY_FILE_NAME"
+ fi
fi
if [ -z "$secgroup_id" ] ; then
diff --git a/wally/webui.py b/scripts/webui.py
similarity index 100%
rename from wally/webui.py
rename to scripts/webui.py
diff --git a/wally/__main__.py b/wally/__main__.py
index 97011d9..fd31f16 100644
--- a/wally/__main__.py
+++ b/wally/__main__.py
@@ -1,5 +1,5 @@
import sys
-from .run_test import main
+from .main import main
if __name__ == '__main__':
diff --git a/wally/config.py b/wally/config.py
index c760509..abc6bb0 100644
--- a/wally/config.py
+++ b/wally/config.py
@@ -13,8 +13,6 @@
import pretty_yaml
-cfg_dict = {}
-
class NoData(object):
@classmethod
@@ -23,6 +21,10 @@
class Config(object):
+ def __init__(self, val=None):
+ if val is not None:
+ self.update(val)
+
def get(self, name, defval=None):
obj = self.__dict__
for cname in name.split("."):
@@ -32,14 +34,8 @@
return defval
return obj
-
-cfg = Config()
-cfg.__dict__ = cfg_dict
-
-
-def mkdirs_if_unxists(path):
- if not os.path.exists(path):
- os.makedirs(path)
+ def update(self, val):
+ self.__dict__.update(val)
def get_test_files(results_dir):
@@ -55,71 +51,80 @@
log_file='log.txt',
sensor_storage='sensor_storage',
nodes_report_file='nodes.yaml',
- results='results',
+ results_storage='results',
hwinfo_directory='hwinfo',
hwreport_fname='hwinfo.txt',
raw_results='raw_results.yaml')
res = dict((k, in_var_dir(v)) for k, v in res.items())
- res['var_dir'] = results_dir
+ res['results_dir'] = results_dir
return res
-def load_config(file_name, explicit_folder=None):
- cfg_dict.update(yaml.load(open(file_name).read()))
+def load_config(file_name):
+ file_name = os.path.abspath(file_name)
- var_dir = cfg_dict.get('internal', {}).get('var_dir_root', '/tmp')
- run_uuid = None
+ defaults = dict(
+ sensors_remote_path='/tmp/sensors',
+ testnode_log_root='/tmp/wally',
+ settings={}
+ )
- if explicit_folder is None:
+ raw_cfg = yaml.load(open(file_name).read())
+ raw_cfg['config_folder'] = os.path.dirname(file_name)
+ if 'include' in raw_cfg:
+ default_path = os.path.join(raw_cfg['config_folder'],
+ raw_cfg.pop('include'))
+ default_cfg = yaml.load(open(default_path).read())
+
+ # TODO: Need more intelectual configs merge?
+ default_cfg.update(raw_cfg)
+ raw_cfg = default_cfg
+
+ cfg = Config(defaults)
+ cfg.update(raw_cfg)
+
+ results_storage = cfg.settings.get('results_storage', '/tmp')
+ results_storage = os.path.abspath(results_storage)
+
+ existing = file_name.startswith(results_storage)
+
+ if existing:
+ cfg.results_dir = os.path.dirname(file_name)
+ cfg.run_uuid = os.path.basename(cfg.results_dir)
+ else:
+ # genarate result folder name
for i in range(10):
- run_uuid = pet_generate(2, "_")
- results_dir = os.path.join(var_dir, run_uuid)
- if not os.path.exists(results_dir):
+ cfg.run_uuid = pet_generate(2, "_")
+ cfg.results_dir = os.path.join(results_storage,
+ cfg.run_uuid)
+ if not os.path.exists(cfg.results_dir):
break
else:
- run_uuid = str(uuid.uuid4())
- results_dir = os.path.join(var_dir, run_uuid)
- cfg_dict['run_uuid'] = run_uuid.replace('_', '-')
- else:
- if not os.path.isdir(explicit_folder):
- ex2 = os.path.join(var_dir, explicit_folder)
- if os.path.isdir(ex2):
- explicit_folder = ex2
- else:
- raise RuntimeError("No such directory " + explicit_folder)
+ cfg.run_uuid = str(uuid.uuid4())
+ cfg.results_dir = os.path.join(results_storage,
+ cfg.run_uuid)
- results_dir = explicit_folder
+ # setup all files paths
+ cfg.update(get_test_files(cfg.results_dir))
- cfg_dict.update(get_test_files(results_dir))
- mkdirs_if_unxists(cfg_dict['var_dir'])
+ if existing:
+ cfg.update(load_run_params(cfg.run_params_file))
- if explicit_folder is not None:
- cfg_dict.update(load_run_params(cfg_dict['run_params_file']))
- run_uuid = cfg_dict['run_uuid']
-
- mkdirs_if_unxists(cfg_dict['sensor_storage'])
-
- if 'sensors_remote_path' not in cfg_dict:
- cfg_dict['sensors_remote_path'] = '/tmp/sensors'
-
- testnode_log_root = cfg_dict.get('testnode_log_root', '/var/wally')
+ testnode_log_root = cfg.get('testnode_log_root')
testnode_log_dir = os.path.join(testnode_log_root, "{0}/{{name}}")
- cfg_dict['default_test_local_folder'] = \
- testnode_log_dir.format(cfg_dict['run_uuid'])
+ cfg.default_test_local_folder = testnode_log_dir.format(cfg.run_uuid)
- mkdirs_if_unxists(cfg_dict['results'])
- mkdirs_if_unxists(cfg_dict['hwinfo_directory'])
-
- return results_dir
+ return cfg
-def save_run_params():
+def save_run_params(cfg):
params = {
- 'comment': cfg_dict['comment'],
- 'run_uuid': cfg_dict['run_uuid']
+ 'comment': cfg.comment,
+ 'run_uuid': cfg.run_uuid
}
- with open(cfg_dict['run_params_file'], 'w') as fd:
+
+ with open(cfg.run_params_file, 'w') as fd:
fd.write(pretty_yaml.dumps(params))
diff --git a/wally/main.py b/wally/main.py
new file mode 100644
index 0000000..c25d0b6
--- /dev/null
+++ b/wally/main.py
@@ -0,0 +1,303 @@
+import os
+import sys
+import time
+import signal
+import logging
+import argparse
+import functools
+import contextlib
+
+from yaml import load as _yaml_load
+
+try:
+ from yaml import CLoader
+ yaml_load = functools.partial(_yaml_load, Loader=CLoader)
+except ImportError:
+ yaml_load = _yaml_load
+
+
+import texttable
+
+try:
+ import faulthandler
+except ImportError:
+ faulthandler = None
+
+
+from wally.timeseries import SensorDatastore
+from wally import utils, run_test, pretty_yaml
+from wally.config import (load_config, setup_loggers,
+ get_test_files, save_run_params, load_run_params)
+
+
+logger = logging.getLogger("wally")
+
+
+class Context(object):
+ def __init__(self):
+ self.build_meta = {}
+ self.nodes = []
+ self.clear_calls_stack = []
+ self.openstack_nodes_ids = []
+ self.sensors_mon_q = None
+ self.hw_info = []
+
+
+def get_stage_name(func):
+ nm = get_func_name(func)
+ if nm.endswith("stage"):
+ return nm
+ else:
+ return nm + " stage"
+
+
+def get_test_names(raw_res):
+ res = []
+ for tp, data in raw_res:
+ if not isinstance(data, list):
+ raise ValueError()
+
+ keys = []
+ for dt in data:
+ if not isinstance(dt, dict):
+ raise ValueError()
+
+ keys.append(",".join(dt.keys()))
+
+ res.append(tp + "(" + ",".join(keys) + ")")
+ return res
+
+
+def list_results(path):
+ results = []
+
+ for dname in os.listdir(path):
+ try:
+ files_cfg = get_test_files(os.path.join(path, dname))
+
+ if not os.path.isfile(files_cfg['raw_results']):
+ continue
+
+ mt = os.path.getmtime(files_cfg['raw_results'])
+ res_mtime = time.ctime(mt)
+
+ raw_res = yaml_load(open(files_cfg['raw_results']).read())
+ test_names = ",".join(sorted(get_test_names(raw_res)))
+
+ params = load_run_params(files_cfg['run_params_file'])
+
+ comm = params.get('comment')
+ results.append((mt, dname, test_names, res_mtime,
+ '-' if comm is None else comm))
+ except ValueError:
+ pass
+
+ tab = texttable.Texttable(max_width=200)
+ tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
+ tab.set_cols_align(["l", "l", "l", "l"])
+ results.sort()
+
+ for data in results[::-1]:
+ tab.add_row(data[1:])
+
+ tab.header(["Name", "Tests", "etime", "Comment"])
+
+ print(tab.draw())
+
+
+def get_func_name(obj):
+ if hasattr(obj, '__name__'):
+ return obj.__name__
+ if hasattr(obj, 'func_name'):
+ return obj.func_name
+ return obj.func.func_name
+
+
+@contextlib.contextmanager
+def log_stage(func):
+ msg_templ = "Exception during {0}: {1!s}"
+ msg_templ_no_exc = "During {0}"
+
+ logger.info("Start " + get_stage_name(func))
+
+ try:
+ yield
+ except utils.StopTestError as exc:
+ logger.error(msg_templ.format(
+ get_func_name(func), exc))
+ except Exception:
+ logger.exception(msg_templ_no_exc.format(
+ get_func_name(func)))
+
+
+def make_storage_dir_struct(cfg):
+ utils.mkdirs_if_unxists(cfg.results_dir)
+ utils.mkdirs_if_unxists(cfg.sensor_storage)
+ utils.mkdirs_if_unxists(cfg.hwinfo_directory)
+ utils.mkdirs_if_unxists(cfg.results_storage)
+
+
+def log_nodes_statistic_stage(_, ctx):
+ utils.log_nodes_statistic(ctx.nodes)
+
+
+def parse_args(argv):
+ descr = "Disk io performance test suite"
+ parser = argparse.ArgumentParser(prog='wally', description=descr)
+ parser.add_argument("-l", '--log-level',
+ help="print some extra log info")
+
+ subparsers = parser.add_subparsers(dest='subparser_name')
+
+ # ---------------------------------------------------------------------
+ compare_help = 'list all results'
+ report_parser = subparsers.add_parser('ls', help=compare_help)
+ report_parser.add_argument("result_storage", help="Folder with test results")
+
+ # ---------------------------------------------------------------------
+ compare_help = 'compare two results'
+ report_parser = subparsers.add_parser('compare', help=compare_help)
+ report_parser.add_argument("data_path1", help="First folder with test results")
+ report_parser.add_argument("data_path2", help="Second folder with test results")
+
+ # ---------------------------------------------------------------------
+ report_help = 'run report on previously obtained results'
+ report_parser = subparsers.add_parser('report', help=report_help)
+ report_parser.add_argument('--load_report', action='store_true')
+ report_parser.add_argument("data_dir", help="folder with rest results")
+
+ # ---------------------------------------------------------------------
+ test_parser = subparsers.add_parser('test', help='run tests')
+ test_parser.add_argument('--build-description',
+ type=str, default="Build info")
+ test_parser.add_argument('--build-id', type=str, default="id")
+ test_parser.add_argument('--build-type', type=str, default="GA")
+ test_parser.add_argument('-n', '--no-tests', action='store_true',
+ help="Don't run tests", default=False)
+ test_parser.add_argument('--load_report', action='store_true')
+ test_parser.add_argument("-k", '--keep-vm', action='store_true',
+ help="Don't remove test vm's", default=False)
+ test_parser.add_argument("-d", '--dont-discover-nodes', action='store_true',
+ help="Don't connect/discover fuel nodes",
+ default=False)
+ test_parser.add_argument('--no-report', action='store_true',
+ help="Skip report stages", default=False)
+ test_parser.add_argument("comment", help="Test information")
+ test_parser.add_argument("config_file", help="Yaml config file")
+
+ # ---------------------------------------------------------------------
+
+ return parser.parse_args(argv[1:])
+
+
+def main(argv):
+ if faulthandler is not None:
+ faulthandler.register(signal.SIGUSR1, all_threads=True)
+
+ opts = parse_args(argv)
+ stages = []
+ report_stages = []
+
+ ctx = Context()
+ ctx.results = {}
+ ctx.sensors_data = SensorDatastore()
+
+ if opts.subparser_name == 'test':
+ cfg = load_config(opts.config_file)
+ make_storage_dir_struct(cfg)
+ cfg.comment = opts.comment
+ save_run_params(cfg)
+
+ with open(cfg.saved_config_file, 'w') as fd:
+ fd.write(pretty_yaml.dumps(cfg.__dict__))
+
+ stages = [
+ run_test.discover_stage
+ ]
+
+ stages.extend([
+ run_test.reuse_vms_stage,
+ log_nodes_statistic_stage,
+ run_test.save_nodes_stage,
+ run_test.connect_stage])
+
+ if cfg.settings.get('collect_info', True):
+ stages.append(run_test.collect_hw_info_stage)
+
+ stages.extend([
+ # deploy_sensors_stage,
+ run_test.run_tests_stage,
+ run_test.store_raw_results_stage,
+ # gather_sensors_stage
+ ])
+
+ cfg.keep_vm = opts.keep_vm
+ cfg.no_tests = opts.no_tests
+ cfg.dont_discover_nodes = opts.dont_discover_nodes
+
+ ctx.build_meta['build_id'] = opts.build_id
+ ctx.build_meta['build_descrption'] = opts.build_description
+ ctx.build_meta['build_type'] = opts.build_type
+
+ elif opts.subparser_name == 'ls':
+ list_results(opts.result_storage)
+ return 0
+
+ elif opts.subparser_name == 'report':
+ cfg = load_config(get_test_files(opts.data_dir)['saved_config_file'])
+ stages.append(run_test.load_data_from(opts.data_dir))
+ opts.no_report = False
+ # load build meta
+
+ elif opts.subparser_name == 'compare':
+ x = run_test.load_data_from_path(opts.data_path1)
+ y = run_test.load_data_from_path(opts.data_path2)
+ print(run_test.IOPerfTest.format_diff_for_console(
+ [x['io'][0], y['io'][0]]))
+ return 0
+
+ if not opts.no_report:
+ report_stages.append(run_test.console_report_stage)
+ if opts.load_report:
+ report_stages.append(run_test.test_load_report_stage)
+ report_stages.append(run_test.html_report_stage)
+
+ if opts.log_level is not None:
+ str_level = opts.log_level
+ else:
+ str_level = cfg.settings.get('log_level', 'INFO')
+
+ setup_loggers(getattr(logging, str_level), cfg.log_file)
+ logger.info("All info would be stored into " + cfg.results_dir)
+
+ for stage in stages:
+ ok = False
+ with log_stage(stage):
+ stage(cfg, ctx)
+ ok = True
+ if not ok:
+ break
+
+ exc, cls, tb = sys.exc_info()
+ for stage in ctx.clear_calls_stack[::-1]:
+ with log_stage(stage):
+ stage(cfg, ctx)
+
+ logger.debug("Start utils.cleanup")
+ for clean_func, args, kwargs in utils.iter_clean_func():
+ with log_stage(clean_func):
+ clean_func(*args, **kwargs)
+
+ if exc is None:
+ for report_stage in report_stages:
+ with log_stage(report_stage):
+ report_stage(cfg, ctx)
+
+ logger.info("All info stored into " + cfg.results_dir)
+
+ if exc is None:
+ logger.info("Tests finished successfully")
+ return 0
+ else:
+ logger.error("Tests are failed. See detailed error above")
+ return 1
diff --git a/wally/report.py b/wally/report.py
index 39950a4..8b83d5d 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -9,6 +9,7 @@
try:
import numpy
import scipy
+ import matplotlib
import matplotlib.pyplot as plt
except ImportError:
plt = None
@@ -353,6 +354,8 @@
log_lat=False,
boxplots=False,
latv_50=None, latv_95=None):
+
+ matplotlib.rcParams.update({'font.size': 10})
points = " MiBps" if legend == 'BW' else ""
lc = len(concurence)
width = 0.35
@@ -443,10 +446,12 @@
if use_bw:
data = [x.bw.average / 1000 for x in chart_data]
data_dev = [x.bw.confidence / 1000 for x in chart_data]
+ # data_dev = [x.bw.deviation / 1000 for x in chart_data]
name = "BW"
else:
data = [x.iops.average for x in chart_data]
data_dev = [x.iops.confidence for x in chart_data]
+ # data_dev = [x.iops.deviation for x in chart_data]
name = "IOPS"
fc = io_chart(title=desc,
@@ -541,6 +546,7 @@
setattr(di, 'rws4k_{}ms'.format(tlat), 0)
elif pos == len(latv):
iops3, _, _ = rws4k_iops_lat_th[-1]
+ iops3 = int(round_3_digit(iops3))
setattr(di, 'rws4k_{}ms'.format(tlat), ">=" + str(iops3))
else:
lat1 = latv[pos - 1]
@@ -554,7 +560,8 @@
th_iops_coef = (iops2 - iops1) / (th2 - th1)
iops3 = th_iops_coef * (th3 - th1) + iops1
- setattr(di, 'rws4k_{}ms'.format(tlat), int(iops3))
+ iops3 = int(round_3_digit(iops3))
+ setattr(di, 'rws4k_{}ms'.format(tlat), iops3)
hdi = DiskInfo()
diff --git a/wally/run_test.py b/wally/run_test.py
index 4484d45..43e4e43 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -1,13 +1,7 @@
-from __future__ import print_function
-
import os
import re
-import sys
import time
-import pprint
-import signal
import logging
-import argparse
import functools
import contextlib
import collections
@@ -20,23 +14,12 @@
except ImportError:
yaml_load = _yaml_load
-
-import texttable
-
-try:
- import faulthandler
-except ImportError:
- faulthandler = None
-
from concurrent.futures import ThreadPoolExecutor
-from wally import pretty_yaml
from wally.hw_info import get_hw_info
+from wally.config import get_test_files
from wally.discover import discover, Node
-from wally.timeseries import SensorDatastore
-from wally import utils, report, ssh_utils, start_vms
-from wally.config import (cfg_dict, load_config, setup_loggers,
- get_test_files, save_run_params, load_run_params)
+from wally import pretty_yaml, utils, report, ssh_utils, start_vms
from wally.sensors_utils import with_sensors_util, sensors_info_util
from wally.suits.mysql import MysqlTest
@@ -52,65 +35,65 @@
}
-try:
- from wally import webui
-except ImportError:
- webui = None
-
-
logger = logging.getLogger("wally")
-def format_result(res, formatter):
- data = "\n{0}\n".format("=" * 80)
- data += pprint.pformat(res) + "\n"
- data += "{0}\n".format("=" * 80)
- templ = "{0}\n\n====> {1}\n\n{2}\n\n"
- return templ.format(data, formatter(res), "=" * 80)
+def connect_all(nodes, spawned_node=False):
+ """
+ Connect to all nodes, log errors
+ nodes:[Node] - list of nodes
+ spawned_node:bool - whenever nodes is newly spawned VM
+ """
-
-class Context(object):
- def __init__(self):
- self.build_meta = {}
- self.nodes = []
- self.clear_calls_stack = []
- self.openstack_nodes_ids = []
- self.sensors_mon_q = None
- self.hw_info = []
-
-
-def connect_one(node, vm=False):
- if node.conn_url == 'local':
- node.connection = ssh_utils.connect(node.conn_url)
- return
-
- try:
- ssh_pref = "ssh://"
- if node.conn_url.startswith(ssh_pref):
- url = node.conn_url[len(ssh_pref):]
-
- if vm:
- conn_timeout = 240
- else:
- conn_timeout = 30
-
- node.connection = ssh_utils.connect(url,
- conn_timeout=conn_timeout)
- else:
- raise ValueError("Unknown url type {0}".format(node.conn_url))
- except Exception as exc:
- # logger.exception("During connect to " + node.get_conn_id())
- msg = "During connect to {0}: {1!s}".format(node.get_conn_id(),
- exc)
- logger.error(msg)
- node.connection = None
-
-
-def connect_all(nodes, vm=False):
logger.info("Connecting to nodes")
+
+ conn_timeout = 240 if spawned_node else 30
+
+ def connect_ext(conn_url):
+ try:
+ return ssh_utils.connect(conn_url, conn_timeout=conn_timeout)
+ except Exception as exc:
+ logger.error("During connect to {0}: {1!s}".format(conn_url, exc))
+ return None
+
+ urls = []
+ ssh_pref = "ssh://"
+
+ for node in nodes:
+ if node.conn_url == 'local':
+ urls.append(node.conn_url)
+ elif node.conn_url.startswith(ssh_pref):
+ urls.append(node.conn_url[len(ssh_pref):])
+ else:
+ msg = "Unknown url type {0}".format(node.conn_url)
+ logger.error(msg)
+ raise utils.StopTestError(msg)
+
with ThreadPoolExecutor(32) as pool:
- connect_one_f = functools.partial(connect_one, vm=vm)
- list(pool.map(connect_one_f, nodes))
+ for node, conn in zip(nodes, pool.map(connect_ext, urls)):
+ node.connection = conn
+
+ failed_testnodes = []
+ failed_nodes = []
+
+ for node in nodes:
+ if node.connection is None:
+ if 'testnode' in node.roles:
+ failed_testnodes.append(node.get_conn_id())
+ else:
+ failed_nodes.append(node.get_conn_id())
+
+ if failed_nodes != []:
+ msg = "Node(s) {0} would be excluded - can't connect"
+ logger.warning(msg.format(",".join(failed_nodes)))
+
+ if failed_testnodes != []:
+ msg = "Can't connect to testnode(s) " + ",".join(failed_testnodes)
+ logger.error(msg)
+ raise utils.StopTestError(msg)
+
+ if len(failed_nodes) == 0:
+ logger.info("All nodes connected successfully")
def collect_hw_info_stage(cfg, ctx):
@@ -132,7 +115,7 @@
if info.hostname is not None:
fname = os.path.join(
- cfg_dict['hwinfo_directory'],
+ cfg.hwinfo_directory,
info.hostname + "_lshw.xml")
with open(fname, "w") as fd:
@@ -141,22 +124,8 @@
logger.debug("Raw hardware info in " + cfg['hwinfo_directory'] + " folder")
-def run_single_test(test_nodes, name, params, log_directory,
- test_local_folder, run_uuid):
-
- test_cls = TOOL_TYPE_MAPPER[name]
- test_cfg = TestConfig(test_cls.__name__,
- params=params,
- test_uuid=run_uuid,
- nodes=test_nodes,
- log_directory=log_directory,
- remote_dir=test_local_folder.format(name=name))
-
- test = test_cls(test_cfg)
- return test.run()
-
-
-def suspend_vm_nodes(unused_nodes):
+@contextlib.contextmanager
+def suspend_vm_nodes_ctx(unused_nodes):
pausable_nodes_ids = [node.os_vm_id for node in unused_nodes
if node.os_vm_id is not None]
non_pausable = len(unused_nodes) - len(pausable_nodes_ids)
@@ -170,15 +139,38 @@
len(pausable_nodes_ids)))
start_vms.pause(pausable_nodes_ids)
- return pausable_nodes_ids
+ try:
+ yield pausable_nodes_ids
+ finally:
+ if len(pausable_nodes_ids) != 0:
+ logger.debug("Unpausing {0} nodes".format(
+ len(pausable_nodes_ids)))
+ start_vms.unpause(pausable_nodes_ids)
+
+
+def generate_result_dir_name(results, name, params):
+ # make a directory for results
+ all_tests_dirs = os.listdir(results)
+
+ if 'name' in params:
+ dir_name = "{0}_{1}".format(name, params['name'])
+ else:
+ for idx in range(len(all_tests_dirs) + 1):
+ dir_name = "{0}_{1}".format(name, idx)
+ if dir_name not in all_tests_dirs:
+ break
+ else:
+ raise utils.StopTestError("Can't select directory for test results")
+
+ return os.path.join(results, dir_name)
def run_tests(cfg, test_block, nodes):
- test_nodes = [node for node in nodes
- if 'testnode' in node.roles]
-
- not_test_nodes = [node for node in nodes
- if 'testnode' not in node.roles]
+ """
+ Run test from test block
+ """
+ test_nodes = [node for node in nodes if 'testnode' in node.roles]
+ not_test_nodes = [node for node in nodes if 'testnode' not in node.roles]
if len(test_nodes) == 0:
logger.error("No test nodes found")
@@ -186,15 +178,23 @@
for name, params in test_block.items():
results = []
- limit = params.get('node_limit')
+
+ # iterate over all node counts
+ limit = params.get('node_limit', len(test_nodes))
if isinstance(limit, (int, long)):
vm_limits = [limit]
- elif limit is None:
- vm_limits = [len(test_nodes)]
else:
+ list_or_tpl = isinstance(limit, (tuple, list))
+ all_ints = list_or_tpl and all(isinstance(climit, (int, long))
+ for climit in limit)
+ if not all_ints:
+ msg = "'node_limit' parameter ion config should" + \
+ "be either int or list if integers, not {0!r}".format(limit)
+ raise ValueError(msg)
vm_limits = limit
for vm_count in vm_limits:
+ # select test nodes
if vm_count == 'all':
curr_test_nodes = test_nodes
unused_nodes = []
@@ -205,111 +205,72 @@
if 0 == len(curr_test_nodes):
continue
- # make a directory for results
- all_tests_dirs = os.listdir(cfg_dict['results'])
+ results_path = generate_result_dir_name(cfg.results_storage, name, params)
+ utils.mkdirs_if_unxists(results_path)
- if 'name' in params:
- dir_name = "{0}_{1}".format(name, params['name'])
+ # suspend all unused virtual nodes
+ if cfg.settings.get('suspend_unused_vms', True):
+ suspend_ctx = suspend_vm_nodes_ctx(unused_nodes)
else:
- for idx in range(len(all_tests_dirs) + 1):
- dir_name = "{0}_{1}".format(name, idx)
- if dir_name not in all_tests_dirs:
- break
- else:
- raise utils.StopTestError(
- "Can't select directory for test results")
+ suspend_ctx = utils.empty_ctx()
- dir_path = os.path.join(cfg_dict['results'], dir_name)
- if not os.path.exists(dir_path):
- os.mkdir(dir_path)
+ with suspend_ctx:
+ resumable_nodes_ids = [node.os_vm_id for node in curr_test_nodes
+ if node.os_vm_id is not None]
- if cfg.get('suspend_unused_vms', True):
- pausable_nodes_ids = suspend_vm_nodes(unused_nodes)
+ if len(resumable_nodes_ids) != 0:
+ logger.debug("Check and unpause {0} nodes".format(
+ len(resumable_nodes_ids)))
+ start_vms.unpause(resumable_nodes_ids)
- resumable_nodes_ids = [node.os_vm_id for node in curr_test_nodes
- if node.os_vm_id is not None]
-
- if len(resumable_nodes_ids) != 0:
- logger.debug("Check and unpause {0} nodes".format(
- len(resumable_nodes_ids)))
- start_vms.unpause(resumable_nodes_ids)
-
- try:
sens_nodes = curr_test_nodes + not_test_nodes
with sensors_info_util(cfg, sens_nodes) as sensor_data:
- t_start = time.time()
- res = run_single_test(curr_test_nodes,
- name,
- params,
- dir_path,
- cfg['default_test_local_folder'],
- cfg['run_uuid'])
- t_end = time.time()
- finally:
- if cfg.get('suspend_unused_vms', True):
- if len(pausable_nodes_ids) != 0:
- logger.debug("Unpausing {0} nodes".format(
- len(pausable_nodes_ids)))
- start_vms.unpause(pausable_nodes_ids)
+ test_cls = TOOL_TYPE_MAPPER[name]
+ remote_dir = cfg.default_test_local_folder.format(name=name)
+
+ test_cfg = TestConfig(test_cls.__name__,
+ params=params,
+ test_uuid=cfg.run_uuid,
+ nodes=test_nodes,
+ log_directory=results_path,
+ remote_dir=remote_dir)
+
+ t_start = time.time()
+ res = test_cls(test_cfg).run()
+ t_end = time.time()
+
+ # save sensor data
if sensor_data is not None:
fname = "{0}_{1}.csv".format(int(t_start), int(t_end))
- fpath = os.path.join(cfg['sensor_storage'], fname)
+ fpath = os.path.join(cfg.sensor_storage, fname)
with open(fpath, "w") as fd:
fd.write("\n\n".join(sensor_data))
- results.extend(res)
+ results.append(res)
yield name, results
-def log_nodes_statistic(_, ctx):
- nodes = ctx.nodes
- logger.info("Found {0} nodes total".format(len(nodes)))
- per_role = collections.defaultdict(lambda: 0)
- for node in nodes:
- for role in node.roles:
- per_role[role] += 1
-
- for role, count in sorted(per_role.items()):
- logger.debug("Found {0} nodes with role {1}".format(count, role))
-
-
def connect_stage(cfg, ctx):
ctx.clear_calls_stack.append(disconnect_stage)
connect_all(ctx.nodes)
-
- all_ok = True
-
- for node in ctx.nodes:
- if node.connection is None:
- if 'testnode' in node.roles:
- msg = "Can't connect to testnode {0}"
- msg = msg.format(node.get_conn_id())
- logger.error(msg)
- raise utils.StopTestError(msg)
- else:
- msg = "Node {0} would be excluded - can't connect"
- logger.warning(msg.format(node.get_conn_id()))
- all_ok = False
-
- if all_ok:
- logger.info("All nodes connected successfully")
-
- ctx.nodes = [node for node in ctx.nodes
- if node.connection is not None]
+ ctx.nodes = [node for node in ctx.nodes if node.connection is not None]
def discover_stage(cfg, ctx):
+ """
+ discover clusters and nodes stage
+ """
if cfg.get('discover') is not None:
- discover_objs = [i.strip() for i in cfg['discover'].strip().split(",")]
+ discover_objs = [i.strip() for i in cfg.discover.strip().split(",")]
nodes = discover(ctx,
discover_objs,
- cfg['clouds'],
- cfg['var_dir'],
- not cfg['dont_discover_nodes'])
+ cfg.clouds,
+ cfg.results_storage,
+ not cfg.dont_discover_nodes)
ctx.nodes.extend(nodes)
@@ -327,16 +288,18 @@
if len(roles) != 0:
cluster[node.conn_url] = roles
- with open(cfg['nodes_report_file'], "w") as fd:
+ with open(cfg.nodes_report_file, "w") as fd:
fd.write(pretty_yaml.dumps(cluster))
def reuse_vms_stage(cfg, ctx):
- p = cfg.get('clouds', {}).get('openstack', {}).get('vms', [])
+ vms_patterns = cfg.get('clouds', {}).get('openstack', {}).get('vms', [])
+ private_key_path = get_vm_keypair(cfg)['keypair_file_private']
- for creds in p:
- vm_name_pattern, conn_pattern = creds.split(",")
+ for creds in vms_patterns:
+ user_name, vm_name_pattern = creds.split("@", 1)
msg = "Vm like {0} lookup failed".format(vm_name_pattern)
+
with utils.log_error(msg):
msg = "Looking for vm with name like {0}".format(vm_name_pattern)
logger.debug(msg)
@@ -348,7 +311,10 @@
conn = start_vms.nova_connect(**os_creds)
for ip, vm_id in start_vms.find_vms(conn, vm_name_pattern):
- node = Node(conn_pattern.format(ip=ip), ['testnode'])
+ conn_url = "ssh://{user}@{ip}::{key}".format(user=user_name,
+ ip=ip,
+ key=private_key_path)
+ node = Node(conn_url, ['testnode'])
node.os_vm_id = vm_id
ctx.nodes.append(node)
@@ -357,8 +323,8 @@
creds = None
tenant = None
- if 'openstack' in cfg['clouds']:
- os_cfg = cfg['clouds']['openstack']
+ if 'openstack' in cfg.clouds:
+ os_cfg = cfg.clouds['openstack']
if 'OPENRC' in os_cfg:
logger.info("Using OS credentials from " + os_cfg['OPENRC'])
user, passwd, tenant, auth_url = utils.get_creds_openrc(os_cfg['OPENRC'])
@@ -372,8 +338,8 @@
passwd = os_cfg['OS_PASSWORD'].strip()
auth_url = os_cfg['OS_AUTH_URL'].strip()
- if tenant is None and 'fuel' in cfg['clouds'] and \
- 'openstack_env' in cfg['clouds']['fuel'] and \
+ if tenant is None and 'fuel' in cfg.clouds and \
+ 'openstack_env' in cfg.clouds['fuel'] and \
ctx.fuel_openstack_creds is not None:
logger.info("Using fuel creds")
creds = ctx.fuel_openstack_creds
@@ -388,31 +354,57 @@
'auth_url': auth_url}
logger.debug("OS_CREDS: user={name} tenant={tenant} auth_url={auth_url}".format(**creds))
+
return creds
+def get_vm_keypair(cfg):
+ res = {}
+ for field, ext in (('keypair_file_private', 'pem'),
+ ('keypair_file_public', 'pub')):
+ fpath = cfg.vm_configs.get(field)
+
+ if fpath is None:
+ fpath = cfg.vm_configs['keypair_name'] + "." + ext
+
+ if os.path.isabs(fpath):
+ res[field] = fpath
+ else:
+ res[field] = os.path.join(cfg.config_folder, fpath)
+ return res
+
+
@contextlib.contextmanager
def create_vms_ctx(ctx, cfg, config, already_has_count=0):
- params = cfg['vm_configs'][config['cfg_name']].copy()
+ if config['count'].startswith('='):
+ count = int(config['count'][1:])
+ if count <= already_has_count:
+ logger.debug("Not need new vms")
+ yield []
+ return
+
+ params = cfg.vm_configs[config['cfg_name']].copy()
os_nodes_ids = []
if not start_vms.is_connected():
os_creds = get_OS_credentials(cfg, ctx)
else:
os_creds = {}
- start_vms.nova_connect(**os_creds)
+
+ nova = start_vms.nova_connect(**os_creds)
params.update(config)
- if 'keypair_file_private' not in params:
- params['keypair_file_private'] = params['keypair_name'] + ".pem"
+ params.update(get_vm_keypair(cfg))
- params['group_name'] = cfg_dict['run_uuid']
+ params['group_name'] = cfg.run_uuid
+ params['keypair_name'] = cfg.vm_configs['keypair_name']
if not config.get('skip_preparation', False):
logger.info("Preparing openstack")
- start_vms.prepare_os_subpr(params=params, **os_creds)
+ start_vms.prepare_os_subpr(nova, params=params, **os_creds)
new_nodes = []
+ old_nodes = ctx.nodes[:]
try:
for new_node, node_id in start_vms.launch_vms(params, already_has_count):
new_node.roles.append('testnode')
@@ -426,17 +418,15 @@
yield new_nodes
finally:
- if not cfg['keep_vm']:
+ if not cfg.keep_vm:
shut_down_vms_stage(cfg, ctx)
+ ctx.nodes = old_nodes
def run_tests_stage(cfg, ctx):
ctx.results = collections.defaultdict(lambda: [])
- if 'tests' not in cfg:
- return
-
- for group in cfg['tests']:
+ for group in cfg.get('tests', []):
if len(group.items()) != 1:
msg = "Items in tests section should have len == 1"
@@ -451,33 +441,36 @@
logger.error(msg)
raise utils.StopTestError(msg)
- num_test_nodes = sum(1 for node in ctx.nodes
- if 'testnode' in node.roles)
+ num_test_nodes = 0
+ for node in ctx.nodes:
+ if 'testnode' in node.roles:
+ num_test_nodes += 1
vm_ctx = create_vms_ctx(ctx, cfg, config['openstack'],
num_test_nodes)
- with vm_ctx as new_nodes:
- if len(new_nodes) != 0:
- logger.debug("Connecting to new nodes")
- connect_all(new_nodes, True)
+ tests = config.get('tests', [])
+ else:
+ vm_ctx = utils.empty_ctx([])
+ tests = [group]
- for node in new_nodes:
- if node.connection is None:
- msg = "Failed to connect to vm {0}"
- raise RuntimeError(msg.format(node.get_conn_id()))
+ if cfg.get('sensors') is None:
+ sensor_ctx = utils.empty_ctx()
+ else:
+ sensor_ctx = with_sensors_util(cfg.get('sensors'), ctx.nodes)
- with with_sensors_util(cfg_dict, ctx.nodes):
- for test_group in config.get('tests', []):
+ with vm_ctx as new_nodes:
+ if len(new_nodes) != 0:
+ connect_all(new_nodes, True)
+
+ if not cfg.no_tests:
+ for test_group in tests:
+ with sensor_ctx:
for tp, res in run_tests(cfg, test_group, ctx.nodes):
ctx.results[tp].extend(res)
- else:
- with with_sensors_util(cfg_dict, ctx.nodes):
- for tp, res in run_tests(cfg, group, ctx.nodes):
- ctx.results[tp].extend(res)
def shut_down_vms_stage(cfg, ctx):
- vm_ids_fname = cfg_dict['vm_ids_fname']
+ vm_ids_fname = cfg.vm_ids_fname
if ctx.openstack_nodes_ids is None:
nodes_ids = open(vm_ids_fname).read().split()
else:
@@ -493,12 +486,12 @@
def store_nodes_in_log(cfg, nodes_ids):
- with open(cfg['vm_ids_fname'], 'w') as fd:
+ with open(cfg.vm_ids_fname, 'w') as fd:
fd.write("\n".join(nodes_ids))
def clear_enviroment(cfg, ctx):
- if os.path.exists(cfg_dict['vm_ids_fname']):
+ if os.path.exists(cfg.vm_ids_fname):
shut_down_vms_stage(cfg, ctx)
@@ -511,28 +504,29 @@
def store_raw_results_stage(cfg, ctx):
-
- raw_results = cfg_dict['raw_results']
-
- if os.path.exists(raw_results):
- cont = yaml_load(open(raw_results).read())
+ if os.path.exists(cfg.raw_results):
+ cont = yaml_load(open(cfg.raw_results).read())
else:
cont = []
cont.extend(utils.yamable(ctx.results).items())
raw_data = pretty_yaml.dumps(cont)
- with open(raw_results, "w") as fd:
+ with open(cfg.raw_results, "w") as fd:
fd.write(raw_data)
def console_report_stage(cfg, ctx):
first_report = True
- text_rep_fname = cfg['text_report_file']
+ text_rep_fname = cfg.text_report_file
with open(text_rep_fname, "w") as fd:
for tp, data in ctx.results.items():
if 'io' == tp and data is not None:
- rep = IOPerfTest.format_for_console(data)
+ rep_lst = []
+ for result in data:
+ rep_lst.append(
+ IOPerfTest.format_for_console(list(result)))
+ rep = "\n\n".join(rep_lst)
elif tp in ['mysql', 'pgbench'] and data is not None:
rep = MysqlTest.format_for_console(data)
else:
@@ -550,7 +544,7 @@
def test_load_report_stage(cfg, ctx):
- load_rep_fname = cfg['load_report_file']
+ load_rep_fname = cfg.load_report_file
found = False
for idx, (tp, data) in enumerate(ctx.results.items()):
if 'io' == tp and data is not None:
@@ -564,49 +558,33 @@
def html_report_stage(cfg, ctx):
- html_rep_fname = cfg['html_report_file']
+ html_rep_fname = cfg.html_report_file
found = False
for tp, data in ctx.results.items():
if 'io' == tp and data is not None:
- if found:
+ if found or len(data) > 1:
logger.error("Making reports for more than one " +
"io block isn't supported! All " +
"report, except first are skipped")
continue
found = True
- report.make_io_report(data,
+ report.make_io_report(list(data[0]),
cfg.get('comment', ''),
html_rep_fname,
lab_info=ctx.hw_info)
-def complete_log_nodes_statistic(cfg, ctx):
- nodes = ctx.nodes
- for node in nodes:
- logger.debug(str(node))
+def load_data_from_path(test_res_dir):
+ files = get_test_files(test_res_dir)
+ raw_res = yaml_load(open(files['raw_results']).read())
+ res = collections.defaultdict(lambda: [])
+ for tp, test_lists in raw_res:
+ for tests in test_lists:
+ for suite_name, suite_data in tests.items():
+ result_folder = suite_data[0]
+ res[tp].append(TOOL_TYPE_MAPPER[tp].load(suite_name, result_folder))
-def load_data_from_file(var_dir, _, ctx):
- raw_results = os.path.join(var_dir, 'raw_results.yaml')
- ctx.results = {}
- for tp, results in yaml_load(open(raw_results).read()):
- cls = TOOL_TYPE_MAPPER[tp]
- ctx.results[tp] = map(cls.load, results)
-
-
-def load_data_from_path(var_dir):
- res_dir = os.path.join(var_dir, 'results')
- res = {}
- for dir_name in os.listdir(res_dir):
- dir_path = os.path.join(res_dir, dir_name)
- if not os.path.isdir(dir_path):
- continue
- rr = re.match(r"(?P<type>[a-z]+)_\d+$", dir_name)
- if rr is None:
- continue
- tp = rr.group('type')
- arr = res.setdefault(tp, [])
- arr.extend(TOOL_TYPE_MAPPER[tp].load(dir_path))
return res
@@ -617,230 +595,3 @@
def load_data_from(var_dir):
return functools.partial(load_data_from_path_stage, var_dir)
-
-
-def parse_args(argv):
- descr = "Disk io performance test suite"
- parser = argparse.ArgumentParser(prog='wally', description=descr)
-
- # subparsers = parser.add_subparsers()
- # test_parser = subparsers.add_parser('test', help='run tests')
-
- parser.add_argument("-l", dest='extra_logs',
- action='store_true', default=False,
- help="print some extra log info")
- parser.add_argument("-b", '--build_description',
- type=str, default="Build info")
- parser.add_argument("-i", '--build_id', type=str, default="id")
- parser.add_argument("-t", '--build_type', type=str, default="GA")
- parser.add_argument("-u", '--username', type=str, default="admin")
- parser.add_argument("-n", '--no-tests', action='store_true',
- help="Don't run tests", default=False)
- parser.add_argument("-p", '--post-process-only', metavar="VAR_DIR",
- help="Only process data from previour run")
- parser.add_argument("-x", '--xxx', action='store_true')
- parser.add_argument("-k", '--keep-vm', action='store_true',
- help="Don't remove test vm's", default=False)
- parser.add_argument("-d", '--dont-discover-nodes', action='store_true',
- help="Don't connect/discover fuel nodes",
- default=False)
- parser.add_argument("-r", '--no-html-report', action='store_true',
- help="Skip html report", default=False)
- parser.add_argument("--params", metavar="testname.paramname",
- help="Test params", default=[])
- parser.add_argument("--ls", action='store_true', default=False)
- parser.add_argument("-c", "--comment", default="")
- parser.add_argument("config_file")
-
- return parser.parse_args(argv[1:])
-
-
-def get_stage_name(func):
- nm = get_func_name(func)
- if nm.endswith("stage"):
- return nm
- else:
- return nm + " stage"
-
-
-def get_test_names(raw_res):
- res = set()
- for tp, data in raw_res:
- for block in data:
- res.add("{0}({1})".format(tp, block.get('test_name', '-')))
- return res
-
-
-def list_results(path):
- results = []
-
- for dname in os.listdir(path):
-
- files_cfg = get_test_files(os.path.join(path, dname))
-
- if not os.path.isfile(files_cfg['raw_results']):
- continue
-
- mt = os.path.getmtime(files_cfg['raw_results'])
- res_mtime = time.ctime(mt)
-
- raw_res = yaml_load(open(files_cfg['raw_results']).read())
- test_names = ",".join(sorted(get_test_names(raw_res)))
-
- params = load_run_params(files_cfg['run_params_file'])
-
- comm = params.get('comment')
- results.append((mt, dname, test_names, res_mtime,
- '-' if comm is None else comm))
-
- tab = texttable.Texttable(max_width=200)
- tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
- tab.set_cols_align(["l", "l", "l", "l"])
- results.sort()
-
- for data in results[::-1]:
- tab.add_row(data[1:])
-
- tab.header(["Name", "Tests", "etime", "Comment"])
-
- print(tab.draw())
-
-
-def get_func_name(obj):
- if hasattr(obj, '__name__'):
- return obj.__name__
- if hasattr(obj, 'func_name'):
- return obj.func_name
- return obj.func.func_name
-
-
-@contextlib.contextmanager
-def log_stage(func):
- msg_templ = "Exception during {0}: {1!s}"
- msg_templ_no_exc = "During {0}"
-
- logger.info("Start " + get_stage_name(func))
-
- try:
- yield
- except utils.StopTestError as exc:
- logger.error(msg_templ.format(
- get_func_name(func), exc))
- except Exception:
- logger.exception(msg_templ_no_exc.format(
- get_func_name(func)))
-
-
-def main(argv):
- if faulthandler is not None:
- faulthandler.register(signal.SIGUSR1, all_threads=True)
-
- opts = parse_args(argv)
-
- # x = load_data_from_path("/var/wally_results/uncorroborant_dinah")
- # y = load_data_from_path("/var/wally_results/nonmelting_jamal")
- # print(IOPerfTest.format_diff_for_console([x['io'], y['io']]))
- # exit(1)
-
- if opts.ls:
- list_results(opts.config_file)
- exit(0)
-
- data_dir = load_config(opts.config_file, opts.post_process_only)
-
- if opts.post_process_only is None:
- cfg_dict['comment'] = opts.comment
- save_run_params()
-
- if cfg_dict.get('logging', {}).get("extra_logs", False) or opts.extra_logs:
- # level = logging.DEBUG
- level = logging.INFO
- else:
- level = logging.WARNING
-
- setup_loggers(level, cfg_dict['log_file'])
-
- if not os.path.exists(cfg_dict['saved_config_file']):
- with open(cfg_dict['saved_config_file'], 'w') as fd:
- fd.write(open(opts.config_file).read())
-
- if opts.post_process_only is not None:
- stages = [
- load_data_from(data_dir)
- ]
- else:
- stages = [
- discover_stage
- ]
-
- stages.extend([
- reuse_vms_stage,
- log_nodes_statistic,
- save_nodes_stage,
- connect_stage])
-
- if cfg_dict.get('collect_info', True):
- stages.append(collect_hw_info_stage)
-
- stages.extend([
- # deploy_sensors_stage,
- run_tests_stage,
- store_raw_results_stage,
- # gather_sensors_stage
- ])
-
- report_stages = [
- console_report_stage,
- ]
-
- if opts.xxx:
- report_stages.append(test_load_report_stage)
- elif not opts.no_html_report:
- report_stages.append(html_report_stage)
-
- logger.info("All info would be stored into {0}".format(
- cfg_dict['var_dir']))
-
- ctx = Context()
- ctx.results = {}
- ctx.build_meta['build_id'] = opts.build_id
- ctx.build_meta['build_descrption'] = opts.build_description
- ctx.build_meta['build_type'] = opts.build_type
- ctx.build_meta['username'] = opts.username
- ctx.sensors_data = SensorDatastore()
-
- cfg_dict['keep_vm'] = opts.keep_vm
- cfg_dict['no_tests'] = opts.no_tests
- cfg_dict['dont_discover_nodes'] = opts.dont_discover_nodes
-
- for stage in stages:
- ok = False
- with log_stage(stage):
- stage(cfg_dict, ctx)
- ok = True
- if not ok:
- break
-
- exc, cls, tb = sys.exc_info()
- for stage in ctx.clear_calls_stack[::-1]:
- with log_stage(stage):
- stage(cfg_dict, ctx)
-
- logger.debug("Start utils.cleanup")
- for clean_func, args, kwargs in utils.iter_clean_func():
- with log_stage(clean_func):
- clean_func(*args, **kwargs)
-
- if exc is None:
- for report_stage in report_stages:
- with log_stage(report_stage):
- report_stage(cfg_dict, ctx)
-
- logger.info("All info stored in {0} folder".format(cfg_dict['var_dir']))
-
- if exc is None:
- logger.info("Tests finished successfully")
- return 0
- else:
- logger.error("Tests are failed. See detailed error above")
- return 1
diff --git a/wally/sensors_utils.py b/wally/sensors_utils.py
index 6b50311..600f3bb 100644
--- a/wally/sensors_utils.py
+++ b/wally/sensors_utils.py
@@ -57,31 +57,27 @@
@contextlib.contextmanager
-def with_sensors_util(cfg, nodes):
- if 'sensors' not in cfg:
- yield
- return
-
+def with_sensors_util(sensors_cfg, nodes):
+ srp = sensors_cfg['sensors_remote_path']
monitored_nodes, sensors_configs, source2roles_map = \
- get_sensors_config_for_nodes(cfg['sensors'], nodes,
- cfg['sensors_remote_path'])
+ get_sensors_config_for_nodes(sensors_cfg, nodes, srp)
- with with_sensors(sensors_configs, cfg['sensors_remote_path']):
+ with with_sensors(sensors_configs, srp):
yield source2roles_map
@contextlib.contextmanager
def sensors_info_util(cfg, nodes):
- if 'sensors' not in cfg:
- yield None
+ if cfg.get('sensors', None) is None:
+ yield
return
_, sensors_configs, _ = \
- get_sensors_config_for_nodes(cfg['sensors'], nodes,
- cfg['sensors_remote_path'])
+ get_sensors_config_for_nodes(cfg.sensors, nodes,
+ cfg.sensors_remote_path)
clear_old_sensors(sensors_configs)
- ctx = sensors_info(sensors_configs, cfg['sensors_remote_path'])
+ ctx = sensors_info(sensors_configs, cfg.sensors_remote_path)
try:
res = ctx.__enter__()
yield res
diff --git a/wally/ssh_utils.py b/wally/ssh_utils.py
index bd73f05..e77b9a8 100644
--- a/wally/ssh_utils.py
+++ b/wally/ssh_utils.py
@@ -365,11 +365,12 @@
def connect(uri, **params):
if uri == 'local':
- return Local()
-
- creds = parse_ssh_uri(uri)
- creds.port = int(creds.port)
- return ssh_connect(creds, **params)
+ res = Local()
+ else:
+ creds = parse_ssh_uri(uri)
+ creds.port = int(creds.port)
+ res = ssh_connect(creds, **params)
+ return res
all_sessions_lock = threading.Lock()
diff --git a/wally/start_vms.py b/wally/start_vms.py
index 471996a..1457da6 100644
--- a/wally/start_vms.py
+++ b/wally/start_vms.py
@@ -1,5 +1,6 @@
import re
import os
+import stat
import time
import os.path
import logging
@@ -66,7 +67,8 @@
return CINDER_CONNECTION
-def prepare_os_subpr(params, name=None, passwd=None, tenant=None,
+def prepare_os_subpr(nova, params, name=None,
+ passwd=None, tenant=None,
auth_url=None):
if name is None:
name, passwd, tenant, auth_url = ostack_get_creds()
@@ -77,6 +79,7 @@
image_name = params['image']['name']
env = os.environ.copy()
+
env.update(dict(
OS_USERNAME=name,
OS_PASSWORD=passwd,
@@ -89,30 +92,35 @@
FLAVOR_CPU_COUNT=str(params['flavor']['cpu_count']),
SERV_GROUPS=serv_groups,
- KEYPAIR_NAME=params['keypair_name'],
SECGROUP=params['security_group'],
IMAGE_NAME=image_name,
- KEY_FILE_NAME=params['keypair_file_private'],
IMAGE_URL=params['image']['url'],
+
+ # KEYPAIR_NAME=params['keypair_name'],
+ # KEY_FILE_NAME=params['keypair_file_private'],
))
spath = os.path.dirname(os.path.dirname(wally.__file__))
spath = os.path.join(spath, 'scripts/prepare.sh')
- cmd = "bash {spath} >/dev/null".format(spath=spath)
+ cmd = "bash {spath} >/dev/null 2>&1 ".format(spath=spath)
subprocess.check_call(cmd, shell=True, env=env)
- conn = nova_connect(name, passwd, tenant, auth_url)
while True:
- status = conn.images.find(name=image_name).status
+ status = nova.images.find(name=image_name).status
if status == 'ACTIVE':
break
msg = "Image {0} is still in {1} state. Waiting 10 more seconds"
logger.info(msg.format(image_name, status))
time.sleep(10)
+ create_keypair(nova,
+ params['keypair_name'],
+ params['keypair_file_public'],
+ params['keypair_file_private'])
+
def find_vms(nova, name_prefix):
for srv in nova.servers.list():
@@ -182,6 +190,42 @@
create_flavor(nova, **params['flavor'])
+def create_keypair(nova, name, pub_key_path, priv_key_path):
+ pub_key_exists = os.path.exists(pub_key_path)
+ priv_key_exists = os.path.exists(priv_key_path)
+
+ try:
+ kpair = nova.keypairs.find(name=name)
+ # if file not found- delete and recreate
+ except NotFound:
+ kpair = None
+
+ if pub_key_exists and not priv_key_exists:
+ raise EnvironmentError("Private key file doesn't exists")
+
+ if not pub_key_exists and priv_key_exists:
+ raise EnvironmentError("Public key file doesn't exists")
+
+ if kpair is None:
+ if pub_key_exists:
+ with open(pub_key_path) as pub_key_fd:
+ return nova.keypairs.create(name, pub_key_fd.read())
+ else:
+ key = nova.keypairs.create(name)
+
+ with open(priv_key_path, "w") as priv_key_fd:
+ priv_key_fd.write(key.private_key)
+ os.chmod(priv_key_path, stat.S_IREAD | stat.S_IWRITE)
+
+ with open(pub_key_path, "w") as pub_key_fd:
+ pub_key_fd.write(key.public_key)
+ elif not priv_key_exists:
+ raise EnvironmentError("Private key file doesn't exists," +
+ " but key uploaded openstack." +
+ " Either set correct path to private key" +
+ " or remove key from openstack")
+
+
def get_or_create_aa_group(nova, name):
try:
group = nova.server_groups.find(name=name)
@@ -221,24 +265,6 @@
pass
-def create_keypair(nova, name, pub_key_path, priv_key_path):
- try:
- nova.keypairs.find(name=name)
- # if file not found- delete and recreate
- except NotFound:
- if os.path.exists(pub_key_path):
- with open(pub_key_path) as pub_key_fd:
- return nova.keypairs.create(name, pub_key_fd.read())
- else:
- key = nova.keypairs.create(name)
-
- with open(priv_key_path, "w") as priv_key_fd:
- priv_key_fd.write(key.private_key)
-
- with open(pub_key_path, "w") as pub_key_fd:
- pub_key_fd.write(key.public_key)
-
-
def create_volume(size, name):
cinder = cinder_connect()
vol = cinder.volumes.create(size=size, display_name=name)
diff --git a/wally/suits/io/ceph.cfg b/wally/suits/io/ceph.cfg
index 48c1ec6..4093b06 100644
--- a/wally/suits/io/ceph.cfg
+++ b/wally/suits/io/ceph.cfg
@@ -1,12 +1,12 @@
[global]
include defaults.cfg
-NUMJOBS_R={% 1, 5, 10, 15, 25, 40, 80 %}
-NUMJOBS_W={% 1, 5, 10, 15, 25 %}
+NUMJOBS_R={% 1, 5, 10, 15, 25, 40, 80, 120 %}
+NUMJOBS_W={% 1, 5, 10, 15, 25, 40 %}
NUMJOBS_SEQ_OPS={% 1, 3, 10 %}
ramp_time=60
-runtime=240
+runtime=180
# ---------------------------------------------------------------------
# check different thread count, sync mode. (latency, iops) = func(th_count)
diff --git a/wally/suits/io/cinder_iscsi.cfg b/wally/suits/io/cinder_iscsi.cfg
index 01439b3..b0f575b 100644
--- a/wally/suits/io/cinder_iscsi.cfg
+++ b/wally/suits/io/cinder_iscsi.cfg
@@ -45,5 +45,6 @@
# ---------------------------------------------------------------------
[cinder_iscsi_{TEST_SUMM}]
ramp_time=240
+runtime=240
blocksize=4k
rw=randwrite
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
index f9f1d1b..777ae5f 100644
--- a/wally/suits/io/fio.py
+++ b/wally/suits/io/fio.py
@@ -67,7 +67,7 @@
return TimeSeriesValue(vals)
-def load_test_results(cls, folder, run_num):
+def load_test_results(folder, run_num):
res = {}
params = None
@@ -101,17 +101,6 @@
mm_res[key] = MeasurementMatrix(matr, conn_ids)
- # iops_from_lat_matr = []
- # for node_ts in mm_res['lat'].data:
- # iops_from_lat_matr.append([])
- # for thread_ts in node_ts:
- # ndt = [(start + ln, 1000000. / val)
- # for (start, ln, val) in thread_ts.data]
- # new_ts = TimeSeriesValue(ndt)
- # iops_from_lat_matr[-1].append(new_ts)
-
- # mm_res['iops_from_lat'] = MeasurementMatrix(iops_from_lat_matr, conn_ids)
-
raw_res = {}
for conn_id in conn_ids:
fn = os.path.join(folder, "{0}_{1}_rawres.json".format(run_num, conn_id_s))
@@ -124,7 +113,7 @@
fio_task.vals.update(params['vals'])
config = TestConfig('io', params, None, params['nodes'], folder, None)
- return cls(config, fio_task, mm_res, raw_res, params['intervals'])
+ return FioRunResult(config, fio_task, mm_res, raw_res, params['intervals'], run_num)
class Attrmapper(object):
@@ -192,7 +181,24 @@
return perc_50 / 1000., perc_95 / 1000.
-class IOTestResult(TestResults):
+class IOTestResults(object):
+ def __init__(self, suite_name, fio_results, log_directory):
+ self.suite_name = suite_name
+ self.fio_results = fio_results
+ self.log_directory = log_directory
+
+ def __iter__(self):
+ return iter(self.fio_results)
+
+ def __len__(self):
+ return len(self.fio_results)
+
+ def get_yamable(self):
+ items = [(fio_res.summary(), fio_res.idx) for fio_res in self]
+ return {self.suite_name: [self.log_directory] + items}
+
+
+class FioRunResult(TestResults):
"""
Fio run results
config: TestConfig
@@ -201,18 +207,15 @@
raw_result: ????
run_interval:(float, float) - test tun time, used for sensors
"""
- def __init__(self, config, fio_task, ts_results, raw_result, run_interval):
+ def __init__(self, config, fio_task, ts_results, raw_result, run_interval, idx):
self.name = fio_task.name.rsplit("_", 1)[0]
self.fio_task = fio_task
+ self.idx = idx
self.bw = ts_results.get('bw')
self.lat = ts_results.get('lat')
self.iops = ts_results.get('iops')
- # self.iops_from_lat = ts_results.get('iops_from_lat')
-
- # self.slat = drop_warmup(res.get('clat', None), self.params)
- # self.clat = drop_warmup(res.get('slat', None), self.params)
res = {"bw": self.bw, "lat": self.lat, "iops": self.iops}
@@ -243,9 +246,6 @@
def summary_tpl(self):
return get_test_summary_tuple(self.fio_task, len(self.config.nodes))
- def get_yamable(self):
- return self.summary()
-
def get_lat_perc_50_95_multy(self):
lat_mks = collections.defaultdict(lambda: 0)
num_res = 0
@@ -278,23 +278,18 @@
self.params,
testnodes_count)
- # ramp_time = self.fio_task.vals.get('ramp_time', 0)
-
def prepare(data, drop=1):
if data is None:
return data
res = []
for ts_data in data:
- # if ramp_time > 0:
- # ts_data = ts_data.skip(ramp_time)
-
if ts_data.average_interval() < avg_interval:
ts_data = ts_data.derived(avg_interval)
# drop last value on bounds
# as they may contains ranges without activities
- assert len(ts_data.values) >= drop + 1
+ assert len(ts_data.values) >= drop + 1, str(drop) + " " + str(ts_data.values)
if drop > 0:
res.append(ts_data.values[:-drop])
@@ -412,11 +407,13 @@
self.fio_configs = list(self.fio_configs)
@classmethod
- def load(cls, folder):
+ def load(cls, suite_name, folder):
+ res = []
for fname in os.listdir(folder):
if re.match("\d+_params.yaml$", fname):
num = int(fname.split('_')[0])
- yield load_test_results(IOTestResult, folder, num)
+ res.append(load_test_results(folder, num))
+ return IOTestResults(suite_name, res, folder)
def cleanup(self):
# delete_file(conn, self.io_py_remote)
@@ -685,7 +682,7 @@
with open(os.path.join(self.config.log_directory, fname), "w") as fd:
fd.write(dumps(params))
- res = load_test_results(IOTestResult, self.config.log_directory, pos)
+ res = load_test_results(self.config.log_directory, pos)
results.append(res)
if self.max_latency is not None:
@@ -702,7 +699,8 @@
if self.min_bw_per_thread > average(test_res['bw']):
lat_bw_limit_reached.add(test_descr)
- return results
+ return IOTestResults(self.config.params['cfg'],
+ results, self.config.log_directory)
def do_run(self, node, barrier, fio_cfg, pos, nolog=False):
if self.use_sudo:
@@ -938,7 +936,6 @@
tab.set_cols_align([f.allign for f in cls.fiels_and_header])
sep = ["-" * f.size for f in cls.fiels_and_header]
tab.header([f.header for f in cls.fiels_and_header])
-
prev_k = None
for item in cls.prepare_data(results):
if prev_k is not None:
diff --git a/wally/suits/io/hdd.cfg b/wally/suits/io/hdd.cfg
index b3c6293..95c8cec 100644
--- a/wally/suits/io/hdd.cfg
+++ b/wally/suits/io/hdd.cfg
@@ -7,6 +7,7 @@
ramp_time=30
runtime=120
+
direct=1
# ---------------------------------------------------------------------
@@ -39,6 +40,5 @@
# check IOPS randwrite.
# ---------------------------------------------------------------------
[hdd_{TEST_SUMM}]
-ramp_time=240
blocksize=4k
rw=randwrite
diff --git a/wally/suits/io/rrd.cfg b/wally/suits/io/rrd.cfg
index 86a73c5..27cb1a0 100644
--- a/wally/suits/io/rrd.cfg
+++ b/wally/suits/io/rrd.cfg
@@ -20,15 +20,16 @@
# ---------------------------------------------------------------------
[test_{TEST_SUMM}]
-blocksize=64m
-rw=randread
-numjobs=1
+blocksize=4k
+rw=randwrite
+sync=1
+numjobs={% 10,50,100 %}
# ---------------------------------------------------------------------
-[test_{TEST_SUMM}]
-blocksize=4m
-rw=write
-numjobs=1
+# [test_{TEST_SUMM}]
+# blocksize=4m
+# rw=write
+# numjobs=1
# ---------------------------------------------------------------------
# [rws_{TEST_SUMM}]
diff --git a/wally/suits/io/verify.cfg b/wally/suits/io/verify.cfg
index 35cf48a..7882633 100644
--- a/wally/suits/io/verify.cfg
+++ b/wally/suits/io/verify.cfg
@@ -2,24 +2,17 @@
include defaults.cfg
size={TEST_FILE_SIZE}
-ramp_time=90
-runtime=600
+ramp_time=0
+runtime=15
# ---------------------------------------------------------------------
[verify_{TEST_SUMM}]
blocksize=4k
rw=randwrite
direct=1
-# numjobs=5
-# ---------------------------------------------------------------------
[verify_{TEST_SUMM}]
blocksize=4k
rw=randwrite
+direct=1
sync=1
-
-# ---------------------------------------------------------------------
-# [verify_{TEST_SUMM}]
-# blocksize=4k
-# rw=randread
-# direct=1
diff --git a/wally/utils.py b/wally/utils.py
index ec4b8c0..64ddd84 100644
--- a/wally/utils.py
+++ b/wally/utils.py
@@ -348,3 +348,24 @@
pass
raise RuntimeError("Unknown os")
+
+
+@contextlib.contextmanager
+def empty_ctx(val=None):
+ yield val
+
+
+def mkdirs_if_unxists(path):
+ if not os.path.exists(path):
+ os.makedirs(path)
+
+
+def log_nodes_statistic(nodes):
+ logger.info("Found {0} nodes total".format(len(nodes)))
+ per_role = collections.defaultdict(lambda: 0)
+ for node in nodes:
+ for role in node.roles:
+ per_role[role] += 1
+
+ for role, count in sorted(per_role.items()):
+ logger.debug("Found {0} nodes with role {1}".format(count, role))