Move to new sensor selector, fix some bugs
diff --git a/wally/run_test.py b/wally/run_test.py
index 6fbefc4..578a65b 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -2,11 +2,10 @@
import json
import copy
import logging
-from concurrent.futures import Future
-from typing import List, Dict, Tuple, Optional, Union, cast
+from typing import List, Tuple, Optional, Union, cast
from cephlib.wally_storage import WallyDB
-from cephlib.node import NodeInfo, IRPCNode, get_hw_info, get_sw_info
+from cephlib.node import NodeInfo, IRPCNode, get_hw_info, get_sw_info, get_hostname
from cephlib.ssh import parse_ssh_uri
from cephlib.node_impl import setup_rpc, connect
@@ -80,7 +79,7 @@
delta = 0
if val > t_end:
delta = val - t_end
- elif t_start > val:
+ elif val < t_start:
delta = t_start - val
if delta > ctx.config.max_time_diff_ms:
@@ -91,9 +90,9 @@
ctx.config.max_time_diff_ms)
logger.error(msg)
raise utils.StopTestError(msg)
- if delta > 0:
- logger.warning("Node %s has time shift at least %s ms", node, delta)
+ if delta > 1:
+ logger.warning("Node %s has time shift at least %s ms", node, int(delta))
def cleanup(self, ctx: TestRun) -> None:
if ctx.config.get("download_rpc_logs", False):
@@ -123,43 +122,25 @@
class CollectInfoStage(Stage):
"""Collect node info"""
- priority = StepOrder.START_SENSORS - 2
+ priority = StepOrder.UPDATE_NODES_INFO
config_block = 'collect_info'
def run(self, ctx: TestRun) -> None:
- if not ctx.config.collect_info:
- return
-
- futures = {} # type: Dict[Tuple[str, str], Future]
-
with ctx.get_pool() as pool:
- # can't make next RPC request until finish with previous
- for node in ctx.nodes:
- nid = node.node_id
- hw_info_path = WallyDB.hw_info.format(node_id=nid)
- if hw_info_path not in ctx.storage:
- futures[(hw_info_path, nid)] = pool.submit(get_hw_info, node)
+ try:
+ # can't make next RPC request until finish with previous for same node
+ for node, hw_info in zip(ctx.nodes, pool.map(get_hw_info, ctx.nodes)):
+ node.info.hw_info = hw_info
+ for node, sw_info in zip(ctx.nodes, pool.map(get_sw_info, ctx.nodes)):
+ node.info.sw_info = sw_info
+ except Exception as exc:
+ logger.exception("During collecting cluster info")
+ raise utils.StopTestError() from exc
- for (path, nid), future in futures.items():
- try:
- ctx.storage.put(future.result(), path)
- except Exception:
- logger.exception("During collecting hardware info from %s", nid)
- raise utils.StopTestError()
-
- futures.clear()
- for node in ctx.nodes:
- nid = node.node_id
- sw_info_path = WallyDB.sw_info.format(node_id=nid)
- if sw_info_path not in ctx.storage:
- futures[(sw_info_path, nid)] = pool.submit(get_sw_info, node)
-
- for (path, nid), future in futures.items():
- try:
- ctx.storage.put(future.result(), path)
- except Exception:
- logger.exception("During collecting software info from %s", nid)
- raise utils.StopTestError()
+ logger.debug("Collecting hostnames")
+ hostnames = pool.map(get_hostname, ctx.nodes)
+ for node, hostname in zip(ctx.nodes, hostnames):
+ node.info.hostname = hostname
class ExplicitNodesStage(Stage):