resume working
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index aae475c..78986f6 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -3,14 +3,15 @@
import logging
import os.path
import datetime
-from typing import Dict, Any, List, Optional, Tuple, cast
+from typing import Dict, Any, List, Optional, Callable, cast
from concurrent.futures import ThreadPoolExecutor, wait
from ..utils import Barrier, StopTestError, sec_to_str
from ..node_interfaces import IRPCNode
from ..storage import Storage
-from ..result_classes import NodeTestResults, TimeSerie
+from ..result_classes import NodeTestResults, IStorable
+from queue import Queue
logger = logging.getLogger("wally")
@@ -45,22 +46,23 @@
self.remote_dir = remote_dir
-class IterationConfig:
+class IterationConfig(IStorable):
name = None # type: str
summary = None # type: str
-class PerfTest:
+class PerfTest(metaclass=abc.ABCMeta):
"""Base class for all tests"""
name = None # type: str
max_retry = 3
retry_time = 30
- def __init__(self, config: TestInputConfig) -> None:
+ def __init__(self, config: TestInputConfig, on_idle: Callable[[], None] = None) -> None:
self.config = config
self.stop_requested = False
self.nodes = self.config.nodes # type: List[IRPCNode]
self.sorted_nodes_ids = sorted(node.info.node_id() for node in self.nodes)
+ self.on_idle = on_idle
def request_stop(self) -> None:
self.stop_requested = True
@@ -85,29 +87,22 @@
max_time_diff = 5
max_rel_time_diff = 0.05
- def __init__(self, config: TestInputConfig) -> None:
- PerfTest.__init__(self, config)
+ def __init__(self, *args, **kwargs) -> None:
+ PerfTest.__init__(self, *args, **kwargs)
self.iterations_configs = [None] # type: List[Optional[IterationConfig]]
+ self.storage_q = Queue() # type: Any
@abc.abstractmethod
def get_expected_runtime(self, iter_cfg: IterationConfig) -> Optional[int]:
pass
def get_not_done_stages(self, storage: Storage) -> Dict[int, IterationConfig]:
- done_stages = [int(name_id.split("_")[1])
- for is_leaf, name_id in storage.list('result')
- if not is_leaf]
- if len(done_stages) == 0:
- start_run_id = 0
- else:
- start_run_id = max(done_stages) + 1
+ not_done = {} # type: Dict[int, IterationConfig]
- not_in_storage = {} # type: Dict[int, IterationConfig]
-
- for run_id, iteration_config in enumerate(self.iterations_configs[start_run_id:], start_run_id):
+ for run_id, iteration_config in enumerate(self.iterations_configs):
info_path = "result/{}/info".format(run_id)
if info_path in storage:
- info = cast(Dict[str, Any], storage[info_path]) # type: Dict[str, Any]
+ info = cast(Dict[str, Any], storage.get(info_path)) # type: Dict[str, Any]
assert isinstance(info, dict), \
"Broken storage at path {}. Expect test info dict, obtain {!r}".format(info_path, info)
@@ -120,7 +115,7 @@
expected_config = {
'name': self.name,
'iteration_name': iter_name,
- 'iteration_config': iteration_config,
+ 'iteration_config': iteration_config.raw(),
'params': self.config.params,
'nodes': self.sorted_nodes_ids
}
@@ -132,8 +127,9 @@
logger.info("Test iteration {} found in storage and will be skipped".format(iter_name))
else:
- not_in_storage[run_id] = iteration_config
- return not_in_storage
+ not_done[run_id] = iteration_config
+
+ return not_done
def run(self) -> None:
not_in_storage = self.get_not_done_stages(self.config.storage)
@@ -162,6 +158,7 @@
iter_name = "Unnamed" if iteration_config is None else iteration_config.name
logger.info("Run test iteration %s", iter_name)
+ current_result_path = "result/{}_{}".format(iteration_config.summary, run_id)
results = [] # type: List[NodeTestResults]
for idx in range(self.max_retry):
logger.debug("Prepare iteration %s", iter_name)
@@ -175,12 +172,8 @@
try:
futures = []
for node in self.nodes:
- path = "result/{}_{}/measurement/{}".format(iteration_config.summary,
- run_id,
- node.info.node_id())
- self.config.storage.clear(path)
- mstorage = self.config.storage.sub_storage(path)
- futures.append(pool.submit(self.run_iteration, node, iteration_config, mstorage))
+ path = "{}/measurement/{}".format(current_result_path, node.info.node_id())
+ futures.append(pool.submit(self.run_iteration, node, iteration_config, path))
results = [fut.result() for fut in futures]
break
@@ -196,11 +189,7 @@
stop_times = [] # type: List[int]
# TODO: FIX result processing - NodeTestResults
- result = None # type: NodeTestResults
for result in results:
- mstorage = self.config.storage.sub_storage("result/{}_{}/measurement/{}"
- .format(result.summary, run_id, result.node_id))
- serie = None # type: TimeSerie
for name, serie in result.series.items():
start_times.append(serie.start_at)
stop_times.append(serie.step * len(serie.data))
@@ -224,14 +213,43 @@
test_config = {
'name': self.name,
'iteration_name': iter_name,
- 'iteration_config': iteration_config,
+ 'iteration_config': iteration_config.raw(),
'params': self.config.params,
'nodes': self.sorted_nodes_ids,
'begin_time': min_start_time,
'end_time': max_stop_time
}
- self.config.storage["result", str(run_id), "info"] = test_config # type: ignore
+ self.process_storage_queue()
+ self.config.storage.put(test_config, "result", str(run_id), "info")
+
+ if "all_results" in self.config.storage:
+ all_results = self.config.storage.get("all_results")
+ else:
+ all_results = []
+
+ all_results.append([self.name, iteration_config.summary, current_result_path])
+ self.config.storage.put(all_results, "all_results")
+ self.config.storage.sync()
+
+ if self.on_idle is not None:
+ self.on_idle()
+
+ def store_data(self, val: Any, type: str, prefix: str, *path: str) -> None:
+ self.storage_q.put((val, type, prefix, path))
+
+ def process_storage_queue(self) -> None:
+ while not self.storage_q.empty():
+ value, val_type, subpath, val_path = self.storage_q.get()
+ if val_type == 'raw':
+ self.config.storage.put_raw(value, subpath, *val_path)
+ elif val_type == 'yaml':
+ self.config.storage.put(value, subpath, *val_path)
+ elif val_type == 'array':
+ self.config.storage.put_array(value, subpath, *val_path)
+ else:
+ logger.error("Internal logic error - unknown data stop type {!r}".format(val_path))
+ raise StopTestError()
@abc.abstractmethod
def config_node(self, node: IRPCNode) -> None:
@@ -242,7 +260,7 @@
pass
@abc.abstractmethod
- def run_iteration(self, node: IRPCNode, iter_config: IterationConfig, substorage: Storage) -> NodeTestResults:
+ def run_iteration(self, node: IRPCNode, iter_config: IterationConfig, stor_prefix: str) -> NodeTestResults:
pass
@@ -269,7 +287,7 @@
def prepare_iteration(self, node: IRPCNode, iter_config: IterationConfig) -> None:
pass
- def run_iteration(self, node: IRPCNode, iter_config: IterationConfig, substorage: Storage) -> NodeTestResults:
+ def run_iteration(self, node: IRPCNode, iter_config: IterationConfig, stor_prefix: str) -> NodeTestResults:
# TODO: have to store logs
cmd = self.join_remote(self.run_script)
cmd += ' ' + self.config.params.get('run_opts', '')