move even more code to cephlib
diff --git a/wally/run_test.py b/wally/run_test.py
index 4e02a75..6fbefc4 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -5,10 +5,13 @@
from concurrent.futures import Future
from typing import List, Dict, Tuple, Optional, Union, cast
-from . import utils, ssh_utils, hw_info
+from cephlib.wally_storage import WallyDB
+from cephlib.node import NodeInfo, IRPCNode, get_hw_info, get_sw_info
+from cephlib.ssh import parse_ssh_uri
+from cephlib.node_impl import setup_rpc, connect
+
+from . import utils
from .config import ConfigBlock
-from .node import setup_rpc, connect
-from .node_interfaces import NodeInfo, IRPCNode
from .stage import Stage, StepOrder
from .sensors import collect_sensors_data
from .suits.all_suits import all_suits
@@ -103,7 +106,7 @@
node.conn.cli.killall()
if node.rpc_log_file is not None:
nid = node.node_id
- path = "rpc_logs/{}.txt".format(nid)
+ path = WallyDB.rpc_logs.format(node_id=nid)
node.conn.server.flush_logs()
log = node.get_file_content(node.rpc_log_file)
if path in ctx.storage:
@@ -133,9 +136,9 @@
# can't make next RPC request until finish with previous
for node in ctx.nodes:
nid = node.node_id
- hw_info_path = "hw_info/{}".format(nid)
+ hw_info_path = WallyDB.hw_info.format(node_id=nid)
if hw_info_path not in ctx.storage:
- futures[(hw_info_path, nid)] = pool.submit(hw_info.get_hw_info, node)
+ futures[(hw_info_path, nid)] = pool.submit(get_hw_info, node)
for (path, nid), future in futures.items():
try:
@@ -147,9 +150,9 @@
futures.clear()
for node in ctx.nodes:
nid = node.node_id
- sw_info_path = "sw_info/{}".format(nid)
+ sw_info_path = WallyDB.sw_info.format(node_id=nid)
if sw_info_path not in ctx.storage:
- futures[(sw_info_path, nid)] = pool.submit(hw_info.get_sw_info, node)
+ futures[(sw_info_path, nid)] = pool.submit(get_sw_info, node)
for (path, nid), future in futures.items():
try:
@@ -166,12 +169,12 @@
config_block = 'nodes'
def run(self, ctx: TestRun) -> None:
- if 'all_nodes' in ctx.storage:
+ if WallyDB.all_nodes in ctx.storage:
logger.info("Skip explicid nodes filling, as all_nodes all ready in storage")
return
for url, roles in ctx.config.get('nodes', {}).raw().items():
- ctx.merge_node(ssh_utils.parse_ssh_uri(url), set(role.strip() for role in roles.split(",")))
+ ctx.merge_node(parse_ssh_uri(url), set(role.strip() for role in roles.split(",")))
logger.debug("Add node %s with roles %s", url, roles)
@@ -293,8 +296,6 @@
class SaveNodesStage(Stage):
"""Save nodes list to file"""
- nodes_path = 'all_nodes'
- params_path = 'all_nodes_params.js'
priority = StepOrder.UPDATE_NODES_INFO + 1
def run(self, ctx: TestRun) -> None:
@@ -302,27 +303,20 @@
params = {node.node_id: node.params for node in infos}
ninfos = [copy.copy(node) for node in infos]
for node in ninfos:
- node.params = "in {!r} file".format(self.params_path)
- ctx.storage.put_list(ninfos, self.nodes_path)
- ctx.storage.put_raw(json.dumps(params).encode('utf8'), self.params_path)
+ node.params = {"in file": WallyDB.nodes_params}
+ ctx.storage.put_list(ninfos, WallyDB.all_nodes)
+ ctx.storage.put_raw(json.dumps(params).encode('utf8'), WallyDB.nodes_params)
class LoadStoredNodesStage(Stage):
priority = StepOrder.DISCOVER
def run(self, ctx: TestRun) -> None:
- if SaveNodesStage.nodes_path in ctx.storage:
+ if WallyDB.all_nodes in ctx.storage:
if ctx.nodes_info:
logger.error("Internal error: Some nodes already stored in " +
"nodes_info before LoadStoredNodesStage stage")
raise utils.StopTestError()
- nodes = {node.node_id: node for node in ctx.storage.load_list(NodeInfo, SaveNodesStage.nodes_path)}
-
- if SaveNodesStage.params_path in ctx.storage:
- params = json.loads(ctx.storage.get_raw(SaveNodesStage.params_path).decode('utf8'))
- for node_id, node in nodes.items():
- node.params = params.get(node_id, {})
-
- ctx.nodes_info = nodes
+ ctx.nodes_info = {node.node_id: node for node in ctx.rstorage.load_nodes()}
logger.info("%s nodes loaded from database", len(ctx.nodes_info))