blob: af317b3d5ee920777aeb29eacaf5a8b25c18117f [file] [log] [blame]
import os
import re
import time
import logging
import functools
import contextlib
import collections
from yaml import load as _yaml_load
try:
from yaml import CLoader
yaml_load = functools.partial(_yaml_load, Loader=CLoader)
except ImportError:
yaml_load = _yaml_load
from concurrent.futures import ThreadPoolExecutor
from wally.hw_info import get_hw_info
from wally.config import get_test_files
from wally.discover import discover, Node
from wally import pretty_yaml, utils, report, ssh_utils, start_vms
from wally.sensors_utils import with_sensors_util, sensors_info_util
from wally.suits.mysql import MysqlTest
from wally.suits.itest import TestConfig
from wally.suits.io.fio import IOPerfTest
from wally.suits.postgres import PgBenchTest
from wally.suits.omgbench import OmgTest
TOOL_TYPE_MAPPER = {
"io": IOPerfTest,
"pgbench": PgBenchTest,
"mysql": MysqlTest,
"omg": OmgTest,
}
logger = logging.getLogger("wally")
def connect_all(nodes, spawned_node=False):
"""
Connect to all nodes, log errors
nodes:[Node] - list of nodes
spawned_node:bool - whenever nodes is newly spawned VM
"""
logger.info("Connecting to nodes")
conn_timeout = 240 if spawned_node else 30
def connect_ext(conn_url):
try:
return ssh_utils.connect(conn_url, conn_timeout=conn_timeout)
except Exception as exc:
logger.error("During connect to {0}: {1!s}".format(conn_url, exc))
return None
urls = []
ssh_pref = "ssh://"
for node in nodes:
if node.conn_url == 'local':
urls.append(node.conn_url)
elif node.conn_url.startswith(ssh_pref):
urls.append(node.conn_url[len(ssh_pref):])
else:
msg = "Unknown url type {0}".format(node.conn_url)
logger.error(msg)
raise utils.StopTestError(msg)
with ThreadPoolExecutor(32) as pool:
for node, conn in zip(nodes, pool.map(connect_ext, urls)):
node.connection = conn
failed_testnodes = []
failed_nodes = []
for node in nodes:
if node.connection is None:
if 'testnode' in node.roles:
failed_testnodes.append(node.get_conn_id())
else:
failed_nodes.append(node.get_conn_id())
if failed_nodes != []:
msg = "Node(s) {0} would be excluded - can't connect"
logger.warning(msg.format(",".join(failed_nodes)))
if failed_testnodes != []:
msg = "Can't connect to testnode(s) " + ",".join(failed_testnodes)
logger.error(msg)
raise utils.StopTestError(msg)
if len(failed_nodes) == 0:
logger.info("All nodes connected successfully")
def collect_hw_info_stage(cfg, ctx):
if os.path.exists(cfg['hwreport_fname']):
msg = "{0} already exists. Skip hw info"
logger.info(msg.format(cfg['hwreport_fname']))
return
with ThreadPoolExecutor(32) as pool:
connections = (node.connection for node in ctx.nodes)
ctx.hw_info.extend(pool.map(get_hw_info, connections))
with open(cfg['hwreport_fname'], 'w') as hwfd:
for node, info in zip(ctx.nodes, ctx.hw_info):
hwfd.write("-" * 60 + "\n")
hwfd.write("Roles : " + ", ".join(node.roles) + "\n")
hwfd.write(str(info) + "\n")
hwfd.write("-" * 60 + "\n\n")
if info.hostname is not None:
fname = os.path.join(
cfg.hwinfo_directory,
info.hostname + "_lshw.xml")
with open(fname, "w") as fd:
fd.write(info.raw)
logger.info("Hardware report stored in " + cfg['hwreport_fname'])
logger.debug("Raw hardware info in " + cfg['hwinfo_directory'] + " folder")
@contextlib.contextmanager
def suspend_vm_nodes_ctx(unused_nodes):
pausable_nodes_ids = [node.os_vm_id for node in unused_nodes
if node.os_vm_id is not None]
non_pausable = len(unused_nodes) - len(pausable_nodes_ids)
if 0 != non_pausable:
logger.warning("Can't pause {0} nodes".format(
non_pausable))
if len(pausable_nodes_ids) != 0:
logger.debug("Try to pause {0} unused nodes".format(
len(pausable_nodes_ids)))
start_vms.pause(pausable_nodes_ids)
try:
yield pausable_nodes_ids
finally:
if len(pausable_nodes_ids) != 0:
logger.debug("Unpausing {0} nodes".format(
len(pausable_nodes_ids)))
start_vms.unpause(pausable_nodes_ids)
def generate_result_dir_name(results, name, params):
# make a directory for results
all_tests_dirs = os.listdir(results)
if 'name' in params:
dir_name = "{0}_{1}".format(name, params['name'])
else:
for idx in range(len(all_tests_dirs) + 1):
dir_name = "{0}_{1}".format(name, idx)
if dir_name not in all_tests_dirs:
break
else:
raise utils.StopTestError("Can't select directory for test results")
return os.path.join(results, dir_name)
def run_tests(cfg, test_block, nodes):
"""
Run test from test block
"""
test_nodes = [node for node in nodes if 'testnode' in node.roles]
not_test_nodes = [node for node in nodes if 'testnode' not in node.roles]
if len(test_nodes) == 0:
logger.error("No test nodes found")
return
for name, params in test_block.items():
results = []
# iterate over all node counts
limit = params.get('node_limit', len(test_nodes))
if isinstance(limit, (int, long)):
vm_limits = [limit]
else:
list_or_tpl = isinstance(limit, (tuple, list))
all_ints = list_or_tpl and all(isinstance(climit, (int, long))
for climit in limit)
if not all_ints:
msg = "'node_limit' parameter ion config should" + \
"be either int or list if integers, not {0!r}".format(limit)
raise ValueError(msg)
vm_limits = limit
for vm_count in vm_limits:
# select test nodes
if vm_count == 'all':
curr_test_nodes = test_nodes
unused_nodes = []
else:
curr_test_nodes = test_nodes[:vm_count]
unused_nodes = test_nodes[vm_count:]
if 0 == len(curr_test_nodes):
continue
results_path = generate_result_dir_name(cfg.results_storage, name, params)
utils.mkdirs_if_unxists(results_path)
# suspend all unused virtual nodes
if cfg.settings.get('suspend_unused_vms', True):
suspend_ctx = suspend_vm_nodes_ctx(unused_nodes)
else:
suspend_ctx = utils.empty_ctx()
with suspend_ctx:
resumable_nodes_ids = [node.os_vm_id for node in curr_test_nodes
if node.os_vm_id is not None]
if len(resumable_nodes_ids) != 0:
logger.debug("Check and unpause {0} nodes".format(
len(resumable_nodes_ids)))
start_vms.unpause(resumable_nodes_ids)
sens_nodes = curr_test_nodes + not_test_nodes
with sensors_info_util(cfg, sens_nodes) as sensor_data:
test_cls = TOOL_TYPE_MAPPER[name]
remote_dir = cfg.default_test_local_folder.format(name=name)
test_cfg = TestConfig(test_cls.__name__,
params=params,
test_uuid=cfg.run_uuid,
nodes=test_nodes,
log_directory=results_path,
remote_dir=remote_dir)
t_start = time.time()
res = test_cls(test_cfg).run()
t_end = time.time()
# save sensor data
if sensor_data is not None:
fname = "{0}_{1}.csv".format(int(t_start), int(t_end))
fpath = os.path.join(cfg.sensor_storage, fname)
with open(fpath, "w") as fd:
fd.write("\n\n".join(sensor_data))
results.append(res)
yield name, results
def connect_stage(cfg, ctx):
ctx.clear_calls_stack.append(disconnect_stage)
connect_all(ctx.nodes)
ctx.nodes = [node for node in ctx.nodes if node.connection is not None]
def discover_stage(cfg, ctx):
"""
discover clusters and nodes stage
"""
if cfg.get('discover') is not None:
discover_objs = [i.strip() for i in cfg.discover.strip().split(",")]
nodes = discover(ctx,
discover_objs,
cfg.clouds,
cfg.results_storage,
not cfg.dont_discover_nodes)
ctx.nodes.extend(nodes)
for url, roles in cfg.get('explicit_nodes', {}).items():
ctx.nodes.append(Node(url, roles.split(",")))
def save_nodes_stage(cfg, ctx):
cluster = {}
for node in ctx.nodes:
roles = node.roles[:]
if 'testnode' in roles:
roles.remove('testnode')
if len(roles) != 0:
cluster[node.conn_url] = roles
with open(cfg.nodes_report_file, "w") as fd:
fd.write(pretty_yaml.dumps(cluster))
def reuse_vms_stage(cfg, ctx):
vms_patterns = cfg.get('clouds', {}).get('openstack', {}).get('vms', [])
private_key_path = get_vm_keypair(cfg)['keypair_file_private']
for creds in vms_patterns:
user_name, vm_name_pattern = creds.split("@", 1)
msg = "Vm like {0} lookup failed".format(vm_name_pattern)
with utils.log_error(msg):
msg = "Looking for vm with name like {0}".format(vm_name_pattern)
logger.debug(msg)
if not start_vms.is_connected():
os_creds = get_OS_credentials(cfg, ctx)
else:
os_creds = None
conn = start_vms.nova_connect(os_creds)
for ip, vm_id in start_vms.find_vms(conn, vm_name_pattern):
conn_url = "ssh://{user}@{ip}::{key}".format(user=user_name,
ip=ip,
key=private_key_path)
node = Node(conn_url, ['testnode'])
node.os_vm_id = vm_id
ctx.nodes.append(node)
def get_OS_credentials(cfg, ctx):
creds = None
os_creds = None
force_insecure = False
if 'openstack' in cfg.clouds:
os_cfg = cfg.clouds['openstack']
if 'OPENRC' in os_cfg:
logger.info("Using OS credentials from " + os_cfg['OPENRC'])
creds_tuple = utils.get_creds_openrc(os_cfg['OPENRC'])
os_creds = start_vms.OSCreds(*creds_tuple)
elif 'ENV' in os_cfg:
logger.info("Using OS credentials from shell environment")
os_creds = start_vms.ostack_get_creds()
elif 'OS_TENANT_NAME' in os_cfg:
logger.info("Using predefined credentials")
os_creds = start_vms.OSCreds(os_cfg['OS_USERNAME'].strip(),
os_cfg['OS_PASSWORD'].strip(),
os_cfg['OS_TENANT_NAME'].strip(),
os_cfg['OS_AUTH_URL'].strip(),
os_cfg.get('OS_INSECURE', False))
elif 'OS_INSECURE' in os_cfg:
force_insecure = os_cfg.get('OS_INSECURE', False)
if os_creds is None and 'fuel' in cfg.clouds and \
'openstack_env' in cfg.clouds['fuel'] and \
ctx.fuel_openstack_creds is not None:
logger.info("Using fuel creds")
creds = start_vms.OSCreds(**ctx.fuel_openstack_creds)
elif os_creds is None:
logger.error("Can't found OS credentials")
raise utils.StopTestError("Can't found OS credentials", None)
if creds is None:
creds = os_creds
if force_insecure and not creds.insecure:
creds = start_vms.OSCreds(creds.name,
creds.passwd,
creds.tenant,
creds.auth_url,
True)
logger.debug(("OS_CREDS: user={0.name} tenant={0.tenant}" +
"auth_url={0.auth_url} insecure={0.insecure}").format(creds))
return creds
def get_vm_keypair(cfg):
res = {}
for field, ext in (('keypair_file_private', 'pem'),
('keypair_file_public', 'pub')):
fpath = cfg.vm_configs.get(field)
if fpath is None:
fpath = cfg.vm_configs['keypair_name'] + "." + ext
if os.path.isabs(fpath):
res[field] = fpath
else:
res[field] = os.path.join(cfg.config_folder, fpath)
return res
@contextlib.contextmanager
def create_vms_ctx(ctx, cfg, config, already_has_count=0):
if config['count'].startswith('='):
count = int(config['count'][1:])
if count <= already_has_count:
logger.debug("Not need new vms")
yield []
return
params = cfg.vm_configs[config['cfg_name']].copy()
os_nodes_ids = []
if not start_vms.is_connected():
os_creds = get_OS_credentials(cfg, ctx)
else:
os_creds = None
nova = start_vms.nova_connect(os_creds)
params.update(config)
params.update(get_vm_keypair(cfg))
params['group_name'] = cfg.run_uuid
params['keypair_name'] = cfg.vm_configs['keypair_name']
if not config.get('skip_preparation', False):
logger.info("Preparing openstack")
start_vms.prepare_os_subpr(nova, params, os_creds)
new_nodes = []
old_nodes = ctx.nodes[:]
try:
for new_node, node_id in start_vms.launch_vms(nova, params, already_has_count):
new_node.roles.append('testnode')
ctx.nodes.append(new_node)
os_nodes_ids.append(node_id)
new_nodes.append(new_node)
store_nodes_in_log(cfg, os_nodes_ids)
ctx.openstack_nodes_ids = os_nodes_ids
yield new_nodes
finally:
if not cfg.keep_vm:
shut_down_vms_stage(cfg, ctx)
ctx.nodes = old_nodes
def run_tests_stage(cfg, ctx):
ctx.results = collections.defaultdict(lambda: [])
for group in cfg.get('tests', []):
if len(group.items()) != 1:
msg = "Items in tests section should have len == 1"
logger.error(msg)
raise utils.StopTestError(msg)
key, config = group.items()[0]
if 'start_test_nodes' == key:
if 'openstack' not in config:
msg = "No openstack block in config - can't spawn vm's"
logger.error(msg)
raise utils.StopTestError(msg)
num_test_nodes = 0
for node in ctx.nodes:
if 'testnode' in node.roles:
num_test_nodes += 1
vm_ctx = create_vms_ctx(ctx, cfg, config['openstack'],
num_test_nodes)
tests = config.get('tests', [])
else:
vm_ctx = utils.empty_ctx([])
tests = [group]
if cfg.get('sensors') is None:
sensor_ctx = utils.empty_ctx()
else:
sensor_ctx = with_sensors_util(cfg.get('sensors'), ctx.nodes)
with vm_ctx as new_nodes:
if len(new_nodes) != 0:
connect_all(new_nodes, True)
if not cfg.no_tests:
for test_group in tests:
with sensor_ctx:
for tp, res in run_tests(cfg, test_group, ctx.nodes):
ctx.results[tp].extend(res)
def shut_down_vms_stage(cfg, ctx):
vm_ids_fname = cfg.vm_ids_fname
if ctx.openstack_nodes_ids is None:
nodes_ids = open(vm_ids_fname).read().split()
else:
nodes_ids = ctx.openstack_nodes_ids
if len(nodes_ids) != 0:
logger.info("Removing nodes")
start_vms.clear_nodes(nodes_ids)
logger.info("Nodes has been removed")
if os.path.exists(vm_ids_fname):
os.remove(vm_ids_fname)
def store_nodes_in_log(cfg, nodes_ids):
with open(cfg.vm_ids_fname, 'w') as fd:
fd.write("\n".join(nodes_ids))
def clear_enviroment(cfg, ctx):
if os.path.exists(cfg.vm_ids_fname):
shut_down_vms_stage(cfg, ctx)
def disconnect_stage(cfg, ctx):
ssh_utils.close_all_sessions()
for node in ctx.nodes:
if node.connection is not None:
node.connection.close()
def store_raw_results_stage(cfg, ctx):
if os.path.exists(cfg.raw_results):
cont = yaml_load(open(cfg.raw_results).read())
else:
cont = []
cont.extend(utils.yamable(ctx.results).items())
raw_data = pretty_yaml.dumps(cont)
with open(cfg.raw_results, "w") as fd:
fd.write(raw_data)
def console_report_stage(cfg, ctx):
first_report = True
text_rep_fname = cfg.text_report_file
with open(text_rep_fname, "w") as fd:
for tp, data in ctx.results.items():
if 'io' == tp and data is not None:
rep_lst = []
for result in data:
rep_lst.append(
IOPerfTest.format_for_console(list(result)))
rep = "\n\n".join(rep_lst)
elif tp in ['mysql', 'pgbench'] and data is not None:
rep = MysqlTest.format_for_console(data)
elif tp == 'omg':
rep = OmgTest.format_for_console(data)
else:
logger.warning("Can't generate text report for " + tp)
continue
fd.write(rep)
fd.write("\n")
if first_report:
logger.info("Text report were stored in " + text_rep_fname)
first_report = False
print("\n" + rep + "\n")
def test_load_report_stage(cfg, ctx):
load_rep_fname = cfg.load_report_file
found = False
for idx, (tp, data) in enumerate(ctx.results.items()):
if 'io' == tp and data is not None:
if found:
logger.error("Making reports for more than one " +
"io block isn't supported! All " +
"report, except first are skipped")
continue
found = True
report.make_load_report(idx, cfg['results'], load_rep_fname)
def html_report_stage(cfg, ctx):
html_rep_fname = cfg.html_report_file
found = False
for tp, data in ctx.results.items():
if 'io' == tp and data is not None:
if found or len(data) > 1:
logger.error("Making reports for more than one " +
"io block isn't supported! All " +
"report, except first are skipped")
continue
found = True
report.make_io_report(list(data[0]),
cfg.get('comment', ''),
html_rep_fname,
lab_info=ctx.hw_info)
def load_data_from_path(test_res_dir):
files = get_test_files(test_res_dir)
raw_res = yaml_load(open(files['raw_results']).read())
res = collections.defaultdict(lambda: [])
for tp, test_lists in raw_res:
for tests in test_lists:
for suite_name, suite_data in tests.items():
result_folder = suite_data[0]
res[tp].append(TOOL_TYPE_MAPPER[tp].load(suite_name, result_folder))
return res
def load_data_from_path_stage(var_dir, _, ctx):
for tp, vals in load_data_from_path(var_dir).items():
ctx.results.setdefault(tp, []).extend(vals)
def load_data_from(var_dir):
return functools.partial(load_data_from_path_stage, var_dir)