test code working
diff --git a/wally/run_test.py b/wally/run_test.py
index 891df5a..52803d1 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -1,4 +1,5 @@
import time
+import json
import logging
from concurrent.futures import Future
from typing import List, Dict, Tuple, Optional, Union, cast
@@ -6,7 +7,7 @@
from . import utils, ssh_utils, hw_info
from .config import ConfigBlock
from .node import setup_rpc, connect
-from .node_interfaces import NodeInfo, IRPCNode
+from .node_interfaces import NodeInfo, IRPCNode, ISSHHost
from .stage import Stage, StepOrder
from .suits.io.fio import IOPerfTest
from .suits.itest import TestInputConfig
@@ -39,6 +40,7 @@
def connect_ext(node_info: NodeInfo) -> Tuple[bool, Union[IRPCNode, NodeInfo]]:
try:
ssh_node = connect(node_info, conn_timeout=ctx.config.connect_timeout)
+
return True, setup_rpc(ssh_node,
ctx.rpc_code,
ctx.default_rpc_plugins,
@@ -66,7 +68,7 @@
logger.warning(msg.format(", ".join(map(str, failed_nodes))))
if failed_testnodes:
- msg = "Can't connect to testnode(s) " + ",".join(map(str, failed_testnodes))
+ msg = "Can't start RPC on testnode(s) " + ",".join(map(str, failed_testnodes))
logger.error(msg)
raise utils.StopTestError(msg)
@@ -74,9 +76,6 @@
logger.info("All nodes connected successfully")
def cleanup(self, ctx: TestRun) -> None:
- # TODO(koder): what next line was for?
- # ssh_utils.close_all_sessions()
-
if ctx.config.get("download_rpc_logs", False):
for node in ctx.nodes:
if node.rpc_log_file is not None:
@@ -94,7 +93,7 @@
class CollectInfoStage(Stage):
"""Collect node info"""
- priority = StepOrder.START_SENSORS - 1
+ priority = StepOrder.START_SENSORS - 2
config_block = 'collect_info'
def run(self, ctx: TestRun) -> None:
@@ -169,6 +168,52 @@
time.sleep(ctx.config.sleep)
+class PrepareNodes(Stage):
+ priority = StepOrder.START_SENSORS - 1
+
+ def __init__(self):
+ Stage.__init__(self)
+ self.nodeepscrub_updated = False
+ self.noscrub_updated = False
+
+ def run(self, ctx: TestRun) -> None:
+ ceph_sett = ctx.config.get('ceph_settings', "").split()
+ if ceph_sett:
+ for node in ctx.nodes:
+ if "ceph-mon" in node.info.roles or "ceph-osd" in node.info.roles:
+ state = json.loads(node.run("ceph health --format json"))["summary"]["summary"]
+ if 'noscrub' in ceph_sett:
+ if 'noscrub' in state:
+ logger.debug("noscrub already set on cluster")
+ else:
+ logger.info("Applying noscrub settings to ceph cluster")
+ node.run("ceph osd set noscrub")
+ self.noscrub_updated = True
+
+ if 'nodeepscrub' in ceph_sett:
+ if 'nodeepscrub' in state:
+ logger.debug("noscrub already set on cluster")
+ else:
+ logger.info("Applying noscrub settings to ceph cluster")
+ node.run("ceph osd set noscrub")
+ self.nodeepscrub_updated = True
+ break
+
+ def cleanup(self, ctx: TestRun) -> None:
+ if self.nodeepscrub_updated or self.noscrub_updated:
+ for node in ctx.nodes:
+ if "ceph-mon" in node.info.roles or "ceph-osd" in node.info.roles :
+ if self.noscrub_updated:
+ logger.info("Reverting noscrub setting for ceph cluster")
+ node.run("ceph osd unset noscrub")
+ self.noscrub_updated = False
+
+ if self.nodeepscrub_updated:
+ logger.info("Reverting noscrub setting for ceph cluster")
+ node.run("ceph osd unset nodeepscrub")
+ self.nodeepscrub_updated = False
+
+
class RunTestsStage(Stage):
priority = StepOrder.TEST