Skeleton and sensors works
diff --git a/wally/run_test.py b/wally/run_test.py
index 8b54f8b..7baac35 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -1,3 +1,4 @@
+import time
import logging
from concurrent.futures import Future
from typing import List, Dict, Tuple, Optional, Union, cast
@@ -38,16 +39,19 @@
def connect_ext(node_info: NodeInfo) -> Tuple[bool, Union[IRPCNode, NodeInfo]]:
try:
ssh_node = connect(node_info, conn_timeout=ctx.config.connect_timeout)
- return True, setup_rpc(ssh_node, ctx.rpc_code, ctx.default_rpc_plugins)
+ return True, setup_rpc(ssh_node,
+ ctx.rpc_code,
+ ctx.default_rpc_plugins,
+ log_level=ctx.config.rpc_log_level)
except Exception as exc:
- logger.error("During connect to {}: {!s}".format(node, exc))
+ logger.exception("During connect to %s: %s", node_info, exc)
return False, node_info
failed_testnodes = [] # type: List[NodeInfo]
failed_nodes = [] # type: List[NodeInfo]
ctx.nodes = []
- for ok, node in pool.map(connect_ext, ctx.nodes_info):
+ for ok, node in pool.map(connect_ext, ctx.nodes_info.values()):
if not ok:
node = cast(NodeInfo, node)
if 'testnode' in node.roles:
@@ -59,11 +63,10 @@
if failed_nodes:
msg = "Node(s) {} would be excluded - can't connect"
- logger.warning(msg.format(",".join(map(str, failed_nodes))))
+ logger.warning(msg.format(", ".join(map(str, failed_nodes))))
if failed_testnodes:
- msg = "Can't connect to testnode(s) " + \
- ",".join(map(str, failed_testnodes))
+ msg = "Can't connect to testnode(s) " + ",".join(map(str, failed_testnodes))
logger.error(msg)
raise utils.StopTestError(msg)
@@ -74,8 +77,18 @@
# TODO(koder): what next line was for?
# ssh_utils.close_all_sessions()
- for node in ctx.nodes:
- node.disconnect()
+ if ctx.config.get("download_rpc_logs", False):
+ for node in ctx.nodes:
+ if node.rpc_log_file is not None:
+ nid = node.info.node_id()
+ path = "rpc_logs/" + nid
+ node.conn.server.flush_logs()
+ log = node.get_file_content(node.rpc_log_file)
+ ctx.storage[path] = log.decode("utf8")
+ logger.debug("RPC log from node {} stored into storage::{}".format(nid, path))
+
+ with ctx.get_pool() as pool:
+ list(pool.map(lambda node: node.disconnect(stop=True), ctx.nodes))
class CollectInfoStage(Stage):
@@ -88,20 +101,36 @@
if not ctx.config.collect_info:
return
- futures = {} # type: Dict[str, Future]
+ futures = {} # type: Dict[Tuple[str, str], Future]
with ctx.get_pool() as pool:
+ # can't make next RPC request until finish with previous
for node in ctx.nodes:
- hw_info_path = "hw_info/{}".format(node.info.node_id())
+ nid = node.info.node_id()
+ hw_info_path = "hw_info/{}".format(nid)
if hw_info_path not in ctx.storage:
- futures[hw_info_path] = pool.submit(hw_info.get_hw_info, node), node
+ futures[(hw_info_path, nid)] = pool.submit(hw_info.get_hw_info, node)
- sw_info_path = "sw_info/{}".format(node.info.node_id())
+ for (path, nid), future in futures.items():
+ try:
+ ctx.storage[path] = future.result()
+ except Exception:
+ logger.exception("During collecting hardware info from %s", nid)
+ raise utils.StopTestError()
+
+ futures.clear()
+ for node in ctx.nodes:
+ nid = node.info.node_id()
+ sw_info_path = "sw_info/{}".format(nid)
if sw_info_path not in ctx.storage:
- futures[sw_info_path] = pool.submit(hw_info.get_sw_info, node)
+ futures[(sw_info_path, nid)] = pool.submit(hw_info.get_sw_info, node)
- for path, future in futures.items():
- ctx.storage[path] = future.result()
+ for (path, nid), future in futures.items():
+ try:
+ ctx.storage[path] = future.result()
+ except Exception:
+ logger.exception("During collecting software info from %s", nid)
+ raise utils.StopTestError()
class ExplicitNodesStage(Stage):
@@ -111,14 +140,12 @@
config_block = 'nodes'
def run(self, ctx: TestRun) -> None:
- explicit_nodes = []
- for url, roles in ctx.config.get('explicit_nodes', {}).items():
- creds = ssh_utils.parse_ssh_uri(url)
- roles = set(roles.split(","))
- explicit_nodes.append(NodeInfo(creds, roles))
+ if 'all_nodes' in ctx.storage:
+ logger.info("Skip explicid nodes filling, as all_nodes all ready in storage")
+ return
- ctx.nodes_info.extend(explicit_nodes)
- ctx.storage['explicit_nodes'] = explicit_nodes # type: ignore
+ for url, roles in ctx.config.get('explicit_nodes', {}).items():
+ ctx.merge_node(ssh_utils.parse_ssh_uri(url), set(roles.split(",")))
class SaveNodesStage(Stage):
@@ -127,7 +154,18 @@
priority = StepOrder.CONNECT
def run(self, ctx: TestRun) -> None:
- ctx.storage['all_nodes'] = ctx.nodes_info # type: ignore
+ ctx.storage['all_nodes'] = list(ctx.nodes_info.values()) # type: ignore
+
+
+class SleepStage(Stage):
+ """Save nodes list to file"""
+
+ priority = StepOrder.TEST
+ config_block = 'sleep'
+
+ def run(self, ctx: TestRun) -> None:
+ logger.debug("Will sleep for %r seconds", ctx.config.sleep)
+ time.sleep(ctx.config.sleep)
class RunTestsStage(Stage):