blob: 5e09d6d2df92af9710e99c05918c016a92e7a3bf [file] [log] [blame]
import os.path
import socket
import logging
from typing import Dict, Any, List, Tuple, cast, Optional
from .node_interfaces import NodeInfo
from .config import ConfigBlock, Config
from .ssh_utils import ConnCreds
from .openstack_api import (os_connect, find_vms,
OSCreds, get_openstack_credentials, prepare_os, launch_vms, clear_nodes)
from .test_run_class import TestRun
from .stage import Stage, StepOrder
from .utils import LogError, StopTestError, get_creds_openrc
logger = logging.getLogger("wally")
def get_floating_ip(vm: Any) -> str:
"""Get VM floating IP address"""
for net_name, ifaces in vm.addresses.items():
for iface in ifaces:
if iface.get('OS-EXT-IPS:type') == "floating":
return iface['addr']
raise ValueError("VM {} has no floating ip".format(vm))
def ensure_connected_to_openstack(ctx: TestRun) -> None:
if not ctx.os_connection is None:
if ctx.os_creds is None:
ctx.os_creds = get_OS_credentials(ctx)
ctx.os_connection = os_connect(ctx.os_creds)
def get_OS_credentials(ctx: TestRun) -> OSCreds:
if "openstack_openrc" in ctx.storage:
return ctx.storage.load(OSCreds, "openstack_openrc")
creds = None
os_creds = None
force_insecure = False
cfg = ctx.config
if 'openstack' in cfg.clouds:
os_cfg = cfg.clouds['openstack']
if 'OPENRC' in os_cfg:
logger.info("Using OS credentials from " + os_cfg['OPENRC'])
creds_tuple = get_creds_openrc(os_cfg['OPENRC'])
os_creds = OSCreds(*creds_tuple)
elif 'ENV' in os_cfg:
logger.info("Using OS credentials from shell environment")
os_creds = get_openstack_credentials()
elif 'OS_TENANT_NAME' in os_cfg:
logger.info("Using predefined credentials")
os_creds = OSCreds(os_cfg['OS_USERNAME'].strip(),
os_cfg['OS_PASSWORD'].strip(),
os_cfg['OS_TENANT_NAME'].strip(),
os_cfg['OS_AUTH_URL'].strip(),
os_cfg.get('OS_INSECURE', False))
elif 'OS_INSECURE' in os_cfg:
force_insecure = os_cfg.get('OS_INSECURE', False)
if os_creds is None and 'fuel' in cfg.clouds and 'openstack_env' in cfg.clouds['fuel'] and \
ctx.fuel_openstack_creds is not None:
logger.info("Using fuel creds")
creds = ctx.fuel_openstack_creds
elif os_creds is None:
logger.error("Can't found OS credentials")
raise StopTestError("Can't found OS credentials", None)
if creds is None:
creds = os_creds
if force_insecure and not creds.insecure:
creds = OSCreds(creds.name, creds.passwd, creds.tenant, creds.auth_url, True)
logger.debug(("OS_CREDS: user={0.name} tenant={0.tenant} " +
"auth_url={0.auth_url} insecure={0.insecure}").format(creds))
ctx.storage["openstack_openrc"] = creds # type: ignore
return creds
def get_vm_keypair_path(cfg: Config) -> Tuple[str, str]:
key_name = cfg.vm_configs['keypair_name']
private_path = os.path.join(cfg.settings_dir, key_name + "_private.pem")
public_path = os.path.join(cfg.settings_dir, key_name + "_public.pub")
return (private_path, public_path)
class DiscoverOSStage(Stage):
"""Discover openstack nodes and VMS"""
config_block = 'openstack'
# discover FUEL cluster first
priority = StepOrder.DISCOVER + 1
@classmethod
def validate(cls, conf: ConfigBlock) -> None:
pass
def run(self, ctx: TestRun) -> None:
if 'all_nodes' in ctx.storage:
logger.debug("Skip openstack discovery, use previously discovered nodes")
return
ensure_connected_to_openstack(ctx)
cfg = ctx.config.openstack
os_nodes_auth = cfg.auth # type: str
if os_nodes_auth.count(":") == 2:
user, password, key_file = os_nodes_auth.split(":") # type: str, Optional[str], Optional[str]
if not password:
password = None
else:
user, password = os_nodes_auth.split(":")
key_file = None
if ctx.config.discovery not in ('disabled', 'metadata'):
services = ctx.os_connection.nova.services.list() # type: List[Any]
host_services_mapping = {} # type: Dict[str, List[str]]
for service in services:
ip = cast(str, socket.gethostbyname(service.host))
host_services_mapping.get(ip, []).append(service.binary)
logger.debug("Found %s openstack service nodes" % len(host_services_mapping))
for host, services in host_services_mapping.items():
creds = ConnCreds(host=host, user=user, passwd=password, key_file=key_file)
ctx.merge_node(creds, set(services))
# TODO: log OS nodes discovery results
else:
logger.info("Scip OS cluster discovery due to 'discovery' setting value")
private_key_path = get_vm_keypair_path(ctx.config)[0]
vm_creds = None # type: str
for vm_creds in cfg.get("vms", []):
user_name, vm_name_pattern = vm_creds.split("@", 1)
msg = "Vm like {} lookup failed".format(vm_name_pattern)
with LogError(msg):
msg = "Looking for vm with name like {0}".format(vm_name_pattern)
logger.debug(msg)
ensure_connected_to_openstack(ctx)
for ip, vm_id in find_vms(ctx.os_connection, vm_name_pattern):
creds = ConnCreds(host=ip, user=user_name, key_file=private_key_path)
info = NodeInfo(creds, {'testnode'})
info.os_vm_id = vm_id
nid = info.node_id()
if nid in ctx.nodes_info:
logger.error("Test VM node has the same id(%s), as existing node %s", nid, ctx.nodes_info[nid])
raise StopTestError()
ctx.nodes_info[nid] = info
class CreateOSVMSStage(Stage):
"Spawn new VM's in Openstack cluster"
priority = StepOrder.SPAWN # type: int
config_block = 'spawn_os_vms' # type: str
def run(self, ctx: TestRun) -> None:
if 'all_nodes' in ctx.storage:
ctx.os_spawned_nodes_ids = ctx.storage['os_spawned_nodes_ids'] # type: ignore
logger.info("Skipping OS VMS discovery/spawn as all data found in storage")
return
if 'os_spawned_nodes_ids' in ctx.storage:
logger.error("spawned_os_nodes_ids is found in storage, but no nodes_info is stored." +
"Fix this before continue")
raise StopTestError()
vm_spawn_config = ctx.config.spawn_os_vms
vm_image_config = ctx.config.vm_configs[vm_spawn_config.cfg_name]
ensure_connected_to_openstack(ctx)
params = vm_image_config.copy()
params.update(vm_spawn_config)
params.update(get_vm_keypair_path(ctx.config))
params['group_name'] = ctx.config.run_uuid
params['keypair_name'] = ctx.config.vm_configs['keypair_name']
if not ctx.config.openstack.get("skip_preparation", False):
logger.info("Preparing openstack")
prepare_os(ctx.os_connection, params)
else:
logger.info("Scip openstack preparation as 'skip_preparation' is set")
ctx.os_spawned_nodes_ids = []
with ctx.get_pool() as pool:
for info in launch_vms(ctx.os_connection, params, pool):
info.roles.add('testnode')
nid = info.node_id()
if nid in ctx.nodes_info:
logger.error("Test VM node has the same id(%s), as existing node %s", nid, ctx.nodes_info[nid])
raise StopTestError()
ctx.nodes_info[nid] = info
ctx.os_spawned_nodes_ids.append(info.os_vm_id)
ctx.storage['os_spawned_nodes_ids'] = ctx.os_spawned_nodes_ids # type: ignore
def cleanup(self, ctx: TestRun) -> None:
# keep nodes in case of error for future test restart
if not ctx.config.keep_vm and ctx.os_spawned_nodes_ids:
logger.info("Removing nodes")
clear_nodes(ctx.os_connection, ctx.os_spawned_nodes_ids)
del ctx.storage['spawned_os_nodes']
logger.info("OS spawned nodes has been successfully removed")
# @contextlib.contextmanager
# def suspend_vm_nodes_ctx(ctx: TestRun, unused_nodes: List[IRPCNode]) -> Iterator[List[int]]:
#
# pausable_nodes_ids = [cast(int, node.info.os_vm_id)
# for node in unused_nodes
# if node.info.os_vm_id is not None]
#
# non_pausable = len(unused_nodes) - len(pausable_nodes_ids)
#
# if non_pausable:
# logger.warning("Can't pause {} nodes".format(non_pausable))
#
# if pausable_nodes_ids:
# logger.debug("Try to pause {} unused nodes".format(len(pausable_nodes_ids)))
# with ctx.get_pool() as pool:
# openstack_api.pause(ctx.os_connection, pausable_nodes_ids, pool)
#
# try:
# yield pausable_nodes_ids
# finally:
# if pausable_nodes_ids:
# logger.debug("Unpausing {} nodes".format(len(pausable_nodes_ids)))
# with ctx.get_pool() as pool:
# openstack_api.unpause(ctx.os_connection, pausable_nodes_ids, pool)
# def clouds_connect_stage(ctx: TestRun) -> None:
# TODO(koder): need to use this to connect to openstack in upper code
# conn = ctx.config['clouds/openstack']
# user, passwd, tenant = parse_creds(conn['creds'])
# auth_data = dict(auth_url=conn['auth_url'],
# username=user,
# api_key=passwd,
# project_id=tenant) # type: Dict[str, str]
# logger.debug("Discovering openstack nodes with connection details: %r", conn)
# connect to openstack, fuel
# # parse FUEL REST credentials
# username, tenant_name, password = parse_creds(fuel_data['creds'])
# creds = {"username": username,
# "tenant_name": tenant_name,
# "password": password}
#
# # connect to FUEL
# conn = fuel_rest_api.KeystoneAuth(fuel_data['url'], creds, headers=None)
# pass