refactoring is on the way
diff --git a/wally/run_test.py b/wally/run_test.py
index e62cfb1..d8ef685 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -1,16 +1,14 @@
import os
-import time
import logging
-import functools
import contextlib
-import collections
-from typing import List, Dict, Iterable, Any, Iterator, Mapping, Callable, Tuple, Optional, Union, cast
+from typing import List, Dict, Iterable, Iterator, Tuple, Optional, Union, cast
from concurrent.futures import ThreadPoolExecutor, Future
from .node_interfaces import NodeInfo, IRPCNode
from .test_run_class import TestRun
from .discover import discover
from . import pretty_yaml, utils, report, ssh_utils, start_vms, hw_info
+from .node import setup_rpc, connect
from .config import ConfigBlock, Config
from .suits.mysql import MysqlTest
@@ -31,18 +29,16 @@
logger = logging.getLogger("wally")
-def connect_all(nodes_info: List[NodeInfo],
- pool: ThreadPoolExecutor,
- conn_timeout: int = 30,
- rpc_conn_callback: ssh_utils.RPCBeforeConnCallback = None) -> List[IRPCNode]:
+def connect_all(nodes_info: List[NodeInfo], pool: ThreadPoolExecutor, conn_timeout: int = 30) -> List[IRPCNode]:
"""Connect to all nodes, log errors"""
logger.info("Connecting to %s nodes", len(nodes_info))
def connect_ext(node_info: NodeInfo) -> Tuple[bool, Union[IRPCNode, NodeInfo]]:
try:
- ssh_node = ssh_utils.connect(node_info.ssh_conn_url, conn_timeout=conn_timeout)
- return True, ssh_utils.setup_rpc(ssh_node, rpc_conn_callback=rpc_conn_callback)
+ ssh_node = connect(node_info, conn_timeout=conn_timeout)
+ # TODO(koder): need to pass all required rpc bytes to this call
+ return True, setup_rpc(ssh_node, b"")
except Exception as exc:
logger.error("During connect to {}: {!s}".format(node, exc))
return False, node_info
@@ -77,16 +73,16 @@
return ready
-def collect_info_stage(ctx: TestRun, nodes: Iterable[IRPCNode]) -> None:
- futures = {} # type: Dict[str, Future]]
+def collect_info_stage(ctx: TestRun) -> None:
+ futures = {} # type: Dict[str, Future]
with ctx.get_pool() as pool:
- for node in nodes:
- hw_info_path = "hw_info/{}".format(node.node_id())
+ for node in ctx.nodes:
+ hw_info_path = "hw_info/{}".format(node.info.node_id())
if hw_info_path not in ctx.storage:
futures[hw_info_path] = pool.submit(hw_info.get_hw_info, node), node
- sw_info_path = "sw_info/{}".format(node.node_id())
+ sw_info_path = "sw_info/{}".format(node.info.node_id())
if sw_info_path not in ctx.storage:
futures[sw_info_path] = pool.submit(hw_info.get_sw_info, node)
@@ -95,7 +91,7 @@
@contextlib.contextmanager
-def suspend_vm_nodes_ctx(unused_nodes: List[IRPCNode]) -> Iterator[List[int]]:
+def suspend_vm_nodes_ctx(ctx: TestRun, unused_nodes: List[IRPCNode]) -> Iterator[List[int]]:
pausable_nodes_ids = [cast(int, node.info.os_vm_id)
for node in unused_nodes
@@ -108,14 +104,16 @@
if pausable_nodes_ids:
logger.debug("Try to pause {} unused nodes".format(len(pausable_nodes_ids)))
- start_vms.pause(pausable_nodes_ids)
+ with ctx.get_pool() as pool:
+ start_vms.pause(ctx.os_connection, pausable_nodes_ids, pool)
try:
yield pausable_nodes_ids
finally:
if pausable_nodes_ids:
logger.debug("Unpausing {} nodes".format(len(pausable_nodes_ids)))
- start_vms.unpause(pausable_nodes_ids)
+ with ctx.get_pool() as pool:
+ start_vms.unpause(ctx.os_connection, pausable_nodes_ids, pool)
def run_tests(ctx: TestRun, test_block: ConfigBlock, nodes: List[IRPCNode]) -> None:
@@ -133,7 +131,7 @@
# select test nodes
if vm_count is None:
curr_test_nodes = test_nodes
- unused_nodes = []
+ unused_nodes = [] # type: List[IRPCNode]
else:
curr_test_nodes = test_nodes[:vm_count]
unused_nodes = test_nodes[vm_count:]
@@ -147,23 +145,23 @@
# suspend all unused virtual nodes
if ctx.config.get('suspend_unused_vms', True):
- suspend_ctx = suspend_vm_nodes_ctx(unused_nodes)
+ suspend_ctx = suspend_vm_nodes_ctx(ctx, unused_nodes)
else:
suspend_ctx = utils.empty_ctx()
+ resumable_nodes_ids = [cast(int, node.info.os_vm_id)
+ for node in curr_test_nodes
+ if node.info.os_vm_id is not None]
+
+ if resumable_nodes_ids:
+ logger.debug("Check and unpause {} nodes".format(len(resumable_nodes_ids)))
+
+ with ctx.get_pool() as pool:
+ start_vms.unpause(ctx.os_connection, resumable_nodes_ids, pool)
+
with suspend_ctx:
- resumable_nodes_ids = [cast(int, node.info.os_vm_id)
- for node in curr_test_nodes
- if node.info.os_vm_id is not None]
-
- if resumable_nodes_ids:
- logger.debug("Check and unpause {} nodes".format(len(resumable_nodes_ids)))
- start_vms.unpause(resumable_nodes_ids)
-
test_cls = TOOL_TYPE_MAPPER[name]
-
remote_dir = ctx.config.default_test_local_folder.format(name=name, uuid=ctx.config.run_uuid)
-
test_cfg = TestConfig(test_cls.__name__,
params=params,
run_uuid=ctx.config.run_uuid,
@@ -178,60 +176,81 @@
ctx.clear_calls_stack.append(disconnect_stage)
with ctx.get_pool() as pool:
- ctx.nodes = connect_all(ctx.nodes_info, pool, rpc_conn_callback=ctx.before_conn_callback)
+ ctx.nodes = connect_all(ctx.nodes_info, pool)
def discover_stage(ctx: TestRun) -> None:
"""discover clusters and nodes stage"""
+ # TODO(koder): Properly store discovery info and check if it available to skip phase
+
discover_info = ctx.config.get('discover')
if discover_info:
- discover_objs = [i.strip() for i in discover_info.strip().split(",")]
+ if "discovered_nodes" in ctx.storage:
+ nodes = ctx.storage.load_list("discovered_nodes", NodeInfo)
+ ctx.fuel_openstack_creds = ctx.storage.load("fuel_openstack_creds", start_vms.OSCreds)
+ else:
+ discover_objs = [i.strip() for i in discover_info.strip().split(",")]
- nodes_info = discover.discover(ctx, discover_objs,
- ctx.config.clouds,
- ctx.storage,
- not ctx.config.dont_discover_nodes)
+ ctx.fuel_openstack_creds, nodes = discover.discover(discover_objs,
+ ctx.config.clouds,
+ not ctx.config.dont_discover_nodes)
- ctx.nodes_info.extend(nodes_info)
+ ctx.storage["fuel_openstack_creds"] = ctx.fuel_openstack_creds # type: ignore
+ ctx.storage["discovered_nodes"] = nodes # type: ignore
+ ctx.nodes_info.extend(nodes)
for url, roles in ctx.config.get('explicit_nodes', {}).items():
- ctx.nodes_info.append(NodeInfo(url, set(roles.split(","))))
+ creds = ssh_utils.parse_ssh_uri(url)
+ roles = set(roles.split(","))
+ ctx.nodes_info.append(NodeInfo(creds, roles))
def save_nodes_stage(ctx: TestRun) -> None:
"""Save nodes list to file"""
- ctx.storage['nodes'] = ctx.nodes_info
+ ctx.storage['nodes'] = ctx.nodes_info # type: ignore
+
+
+def ensure_connected_to_openstack(ctx: TestRun) -> None:
+ if not ctx.os_connection is None:
+ if ctx.os_creds is None:
+ ctx.os_creds = get_OS_credentials(ctx)
+ ctx.os_connection = start_vms.os_connect(ctx.os_creds)
def reuse_vms_stage(ctx: TestRun) -> None:
- vms_patterns = ctx.config.get('clouds/openstack/vms', [])
- private_key_path = get_vm_keypair(ctx.config)['keypair_file_private']
+ if "reused_nodes" in ctx.storage:
+ ctx.nodes_info.extend(ctx.storage.load_list("reused_nodes", NodeInfo))
+ else:
+ reused_nodes = []
+ vms_patterns = ctx.config.get('clouds/openstack/vms', [])
+ private_key_path = get_vm_keypair_path(ctx.config)[0]
- for creds in vms_patterns:
- user_name, vm_name_pattern = creds.split("@", 1)
- msg = "Vm like {} lookup failed".format(vm_name_pattern)
+ for creds in vms_patterns:
+ user_name, vm_name_pattern = creds.split("@", 1)
+ msg = "Vm like {} lookup failed".format(vm_name_pattern)
- with utils.LogError(msg):
- msg = "Looking for vm with name like {0}".format(vm_name_pattern)
- logger.debug(msg)
+ with utils.LogError(msg):
+ msg = "Looking for vm with name like {0}".format(vm_name_pattern)
+ logger.debug(msg)
- if not start_vms.is_connected():
- os_creds = get_OS_credentials(ctx)
- else:
- os_creds = None
+ ensure_connected_to_openstack(ctx)
- conn = start_vms.nova_connect(os_creds)
- for ip, vm_id in start_vms.find_vms(conn, vm_name_pattern):
- conn_url = "ssh://{user}@{ip}::{key}".format(user=user_name,
- ip=ip,
- key=private_key_path)
- node_info = NodeInfo(conn_url, ['testnode'])
- node_info.os_vm_id = vm_id
- ctx.nodes_info.append(node_info)
+ for ip, vm_id in start_vms.find_vms(ctx.os_connection, vm_name_pattern):
+ creds = ssh_utils.ConnCreds(host=ip, user=user_name, key_file=private_key_path)
+ node_info = NodeInfo(creds, {'testnode'})
+ node_info.os_vm_id = vm_id
+ reused_nodes.append(node_info)
+ ctx.nodes_info.append(node_info)
+
+ ctx.storage["reused_nodes"] = reused_nodes # type: ignore
-def get_OS_credentials(ctx: TestRun) -> None:
+def get_OS_credentials(ctx: TestRun) -> start_vms.OSCreds:
+
+ if "openstack_openrc" in ctx.storage:
+ return ctx.storage.load("openstack_openrc", start_vms.OSCreds)
+
creds = None
os_creds = None
force_insecure = False
@@ -245,7 +264,7 @@
os_creds = start_vms.OSCreds(*creds_tuple)
elif 'ENV' in os_cfg:
logger.info("Using OS credentials from shell environment")
- os_creds = start_vms.ostack_get_creds()
+ os_creds = start_vms.get_openstack_credentials()
elif 'OS_TENANT_NAME' in os_cfg:
logger.info("Using predefined credentials")
os_creds = start_vms.OSCreds(os_cfg['OS_USERNAME'].strip(),
@@ -257,11 +276,10 @@
elif 'OS_INSECURE' in os_cfg:
force_insecure = os_cfg.get('OS_INSECURE', False)
- if os_creds is None and 'fuel' in cfg.clouds and \
- 'openstack_env' in cfg.clouds['fuel'] and \
- ctx.fuel_openstack_creds is not None:
+ if os_creds is None and 'fuel' in cfg.clouds and 'openstack_env' in cfg.clouds['fuel'] and \
+ ctx.fuel_openstack_creds is not None:
logger.info("Using fuel creds")
- creds = start_vms.OSCreds(**ctx.fuel_openstack_creds)
+ creds = ctx.fuel_openstack_creds
elif os_creds is None:
logger.error("Can't found OS credentials")
raise utils.StopTestError("Can't found OS credentials", None)
@@ -279,10 +297,11 @@
logger.debug(("OS_CREDS: user={0.name} tenant={0.tenant} " +
"auth_url={0.auth_url} insecure={0.insecure}").format(creds))
+ ctx.storage["openstack_openrc"] = creds # type: ignore
return creds
-def get_vm_keypair(cfg: Config) -> Tuple[str, str]:
+def get_vm_keypair_path(cfg: Config) -> Tuple[str, str]:
key_name = cfg.vm_configs['keypair_name']
private_path = os.path.join(cfg.settings_dir, key_name + "_private.pem")
public_path = os.path.join(cfg.settings_dir, key_name + "_public.pub")
@@ -291,52 +310,54 @@
@contextlib.contextmanager
def create_vms_ctx(ctx: TestRun, vm_config: ConfigBlock, already_has_count: int = 0) -> Iterator[List[NodeInfo]]:
- if vm_config['count'].startswith('='):
- count = int(vm_config['count'][1:])
- if count <= already_has_count:
- logger.debug("Not need new vms")
- yield []
- return
- if not start_vms.is_connected():
- os_creds = get_OS_credentials(ctx)
- else:
- os_creds = None
+ if 'spawned_vm_ids' in ctx.storage:
+ os_nodes_ids = ctx.storage.get('spawned_vm_ids', []) # type: List[int]
+ new_nodes = [] # type: List[NodeInfo]
- nova = start_vms.nova_connect(os_creds)
-
- os_nodes_ids = ctx.storage.get('spawned_vm_ids', []) # # type: List[int]
- new_nodes = [] # type: List[IRPCNode]
-
- if not os_nodes_ids:
- params = ctx.config.vm_configs[vm_config['cfg_name']].copy()
- params.update(vm_config)
- params.update(get_vm_keypair(ctx.config))
- params['group_name'] = ctx.config.run_uuid
- params['keypair_name'] = ctx.config.vm_configs['keypair_name']
-
- if not vm_config.get('skip_preparation', False):
- logger.info("Preparing openstack")
- start_vms.prepare_os(nova, params, os_creds)
- else:
# TODO(koder): reconnect to old VM's
raise NotImplementedError("Reconnect to old vms is not implemented")
+ else:
+ os_nodes_ids = []
+ new_nodes = []
+ no_spawn = False
+ if vm_config['count'].startswith('='):
+ count = int(vm_config['count'][1:])
+ if count <= already_has_count:
+ logger.debug("Not need new vms")
+ no_spawn = True
- already_has_count += len(os_nodes_ids)
- old_nodes = ctx.nodes[:]
+ if not no_spawn:
+ ensure_connected_to_openstack(ctx)
+ params = ctx.config.vm_configs[vm_config['cfg_name']].copy()
+ params.update(vm_config)
+ params.update(get_vm_keypair_path(ctx.config))
+ params['group_name'] = ctx.config.run_uuid
+ params['keypair_name'] = ctx.config.vm_configs['keypair_name']
- for node_info, node_id in start_vms.launch_vms(nova, params, already_has_count):
- node_info.roles.append('testnode')
- os_nodes_ids.append(node_id)
- new_nodes.append(node_info)
- ctx.storage['spawned_vm_ids'] = os_nodes_ids
+ if not vm_config.get('skip_preparation', False):
+ logger.info("Preparing openstack")
+ start_vms.prepare_os(ctx.os_connection, params)
- yield new_nodes
+ with ctx.get_pool() as pool:
+ for node_info in start_vms.launch_vms(ctx.os_connection, params, pool, already_has_count):
+ node_info.roles.add('testnode')
+ os_nodes_ids.append(node_info.os_vm_id)
+ new_nodes.append(node_info)
- # keep nodes in case of error for future test restart
- if not ctx.config.keep_vm:
- shut_down_vms_stage(ctx, os_nodes_ids)
- ctx.storage['spawned_vm_ids'] = []
+ ctx.storage['spawned_vm_ids'] = os_nodes_ids # type: ignore
+ yield new_nodes
+
+ # keep nodes in case of error for future test restart
+ if not ctx.config.keep_vm:
+ shut_down_vms_stage(ctx, os_nodes_ids)
+
+ del ctx.storage['spawned_vm_ids']
+
+
+@contextlib.contextmanager
+def sensor_monitoring(ctx: TestRun, cfg: ConfigBlock, nodes: List[IRPCNode]) -> Iterator[None]:
+ yield
def run_tests_stage(ctx: TestRun) -> None:
@@ -362,15 +383,18 @@
vm_ctx = utils.empty_ctx([])
tests = [group]
- with vm_ctx as new_nodes: # type: List[NodeInfo]
+ # make mypy happy
+ new_nodes = [] # type: List[NodeInfo]
+
+ with vm_ctx as new_nodes:
if new_nodes:
with ctx.get_pool() as pool:
- new_rpc_nodes = connect_all(new_nodes, pool, rpc_conn_callback=ctx.before_conn_callback)
+ new_rpc_nodes = connect_all(new_nodes, pool)
test_nodes = ctx.nodes + new_rpc_nodes
if ctx.config.get('sensors'):
- sensor_ctx = sensor_monitoring(ctx.config.get('sensors'), test_nodes)
+ sensor_ctx = sensor_monitoring(ctx, ctx.config.get('sensors'), test_nodes)
else:
sensor_ctx = utils.empty_ctx([])
@@ -386,13 +410,13 @@
def shut_down_vms_stage(ctx: TestRun, nodes_ids: List[int]) -> None:
if nodes_ids:
logger.info("Removing nodes")
- start_vms.clear_nodes(nodes_ids)
+ start_vms.clear_nodes(ctx.os_connection, nodes_ids)
logger.info("Nodes has been removed")
def clear_enviroment(ctx: TestRun) -> None:
shut_down_vms_stage(ctx, ctx.storage.get('spawned_vm_ids', []))
- ctx.storage['spawned_vm_ids'] = []
+ ctx.storage['spawned_vm_ids'] = [] # type: ignore
def disconnect_stage(ctx: TestRun) -> None: