resume working
diff --git a/wally/run_test.py b/wally/run_test.py
index 52803d1..8e8a4e9 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -7,14 +7,17 @@
from . import utils, ssh_utils, hw_info
from .config import ConfigBlock
from .node import setup_rpc, connect
-from .node_interfaces import NodeInfo, IRPCNode, ISSHHost
+from .node_interfaces import NodeInfo, IRPCNode
from .stage import Stage, StepOrder
+from .sensors import collect_sensors_data
from .suits.io.fio import IOPerfTest
from .suits.itest import TestInputConfig
from .suits.mysql import MysqlTest
from .suits.omgbench import OmgTest
from .suits.postgres import PgBenchTest
from .test_run_class import TestRun
+from .statistic import calc_stat_props
+from .utils import StopTestError
TOOL_TYPE_MAPPER = {
@@ -83,7 +86,11 @@
path = "rpc_logs/" + nid
node.conn.server.flush_logs()
log = node.get_file_content(node.rpc_log_file)
- ctx.storage.store_raw(log, path)
+ if path in ctx.storage:
+ previous = ctx.storage.get_raw(path)
+ else:
+ previous = b""
+ ctx.storage.put_raw(previous + log, path)
logger.debug("RPC log from node {} stored into storage::{}".format(nid, path))
with ctx.get_pool() as pool:
@@ -112,7 +119,7 @@
for (path, nid), future in futures.items():
try:
- ctx.storage[path] = future.result()
+ ctx.storage.put(future.result(), path)
except Exception:
logger.exception("During collecting hardware info from %s", nid)
raise utils.StopTestError()
@@ -126,7 +133,7 @@
for (path, nid), future in futures.items():
try:
- ctx.storage[path] = future.result()
+ ctx.storage.put(future.result(), path)
except Exception:
logger.exception("During collecting software info from %s", nid)
raise utils.StopTestError()
@@ -154,7 +161,7 @@
priority = StepOrder.CONNECT
def run(self, ctx: TestRun) -> None:
- ctx.storage['all_nodes'] = list(ctx.nodes_info.values()) # type: ignore
+ ctx.storage.put_list(ctx.nodes_info.values(), 'all_nodes')
class SleepStage(Stage):
@@ -226,7 +233,7 @@
if not test_nodes:
logger.error("No test nodes found")
- return
+ raise StopTestError()
for name, params in test_group.items():
vm_count = params.get('node_limit', None) # type: Optional[int]
@@ -250,8 +257,46 @@
storage=ctx.storage,
remote_dir=remote_dir)
- test_cls(test_cfg).run()
+ test_cls(test_cfg,
+ on_idle=lambda: collect_sensors_data(ctx, False)).run()
@classmethod
def validate_config(cls, cfg: ConfigBlock) -> None:
pass
+
+
+class CalcStatisticStage(Stage):
+ priority = StepOrder.TEST + 1
+
+ def run(self, ctx: TestRun) -> None:
+ results = {}
+ for name, summary, stor_path in ctx.storage.get("all_results"):
+ if name == 'fio':
+ test_info = ctx.storage.get(stor_path, "info")
+ for node in test_info['nodes']:
+ iops = ctx.storage.get_array(stor_path, node, 'iops_data')
+ bw = ctx.storage.get_array(stor_path, node, 'bw_data')
+ lat = ctx.storage.get_array(stor_path, node, 'lat_data')
+ results[summary] = (iops, bw, lat)
+
+ for name, (iops, bw, lat) in results.items():
+ print(" ------------------- IOPS -------------------")
+ print(calc_stat_props(iops)) # type: ignore
+ print(" ------------------- BW -------------------")
+ print(calc_stat_props(bw)) # type: ignore
+ # print(" ------------------- LAT -------------------")
+ # print(calc_stat_props(lat))
+
+
+class LoadStoredNodesStage(Stage):
+ priority = StepOrder.DISCOVER
+
+ def run(self, ctx: TestRun) -> None:
+ if 'all_nodes' in ctx.storage:
+ if ctx.nodes_info:
+ logger.error("Internal error: Some nodes already stored in " +
+ "nodes_info before LoadStoredNodesStage stage")
+ raise StopTestError()
+ ctx.nodes_info = {node.node_id(): node
+ for node in ctx.storage.load_list(NodeInfo, "all_nodes")}
+ logger.info("%s nodes loaded from database", len(ctx.nodes_info))