resume working
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
index ee34af4..fb18165 100644
--- a/wally/suits/io/fio.py
+++ b/wally/suits/io/fio.py
@@ -1,7 +1,7 @@
import array
import os.path
import logging
-from typing import cast, Dict
+from typing import cast, Any
import wally
@@ -12,7 +12,7 @@
from .fio_task_parser import execution_time, fio_cfg_compile, FioJobSection, FioParams, get_log_files, get_test_summary
from . import rpc_plugin
from .fio_hist import expected_lat_bins
-from ...storage import Storage
+
logger = logging.getLogger("wally")
@@ -21,6 +21,7 @@
soft_runcycle = 5 * 60
retry_time = 30
configs_dir = os.path.dirname(__file__) # type: str
+ name = 'fio'
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
@@ -136,8 +137,9 @@
node.put_to_file(self.remote_task_file, str(iter_config).encode("utf8"))
# TODO: get a link to substorage as a parameter
- def run_iteration(self, node: IRPCNode, iter_config: IterationConfig, substorage: Storage) -> NodeTestResults:
- exec_time = execution_time(cast(FioJobSection, iter_config))
+ def run_iteration(self, node: IRPCNode, iter_config: IterationConfig, stor_prefix: str) -> NodeTestResults:
+ f_iter_config = cast(FioJobSection, iter_config)
+ exec_time = execution_time(f_iter_config)
fio_cmd_templ = "cd {exec_folder}; " + \
"{fio_path} --output-format=json --output={out_file} --alloc-size=262144 {job_file}"
@@ -151,12 +153,10 @@
if must_be_empty:
logger.error("Unexpected fio output: %r", must_be_empty)
- res = NodeTestResults(self.__class__.__name__,
- node.info.node_id(),
- get_test_summary(cast(FioJobSection, iter_config)))
+ res = NodeTestResults(self.__class__.__name__, node.info.node_id(), get_test_summary(f_iter_config))
res.extra_logs['fio'] = node.get_file_content(self.remote_output_file)
- substorage.store_raw(res.extra_logs['fio'], "fio_raw")
+ self.store_data(res.extra_logs['fio'], "raw", stor_prefix, "fio_raw")
node.conn.fs.unlink(self.remote_output_file)
files = [name for name in node.conn.fs.listdir(self.exec_folder)]
@@ -164,7 +164,7 @@
expected_time_delta = 1000 # 1000ms == 1s
max_time_diff = 50 # 50ms - 5%
- for name, path in get_log_files(cast(FioJobSection, iter_config)):
+ for name, path in get_log_files(f_iter_config):
log_files = [fname for fname in files if fname.startswith(path)]
if len(log_files) != 1:
logger.error("Found %s files, match log pattern %s(%s) - %s",
@@ -173,7 +173,7 @@
fname = os.path.join(self.exec_folder, log_files[0])
raw_result = node.get_file_content(fname) # type: bytes
- substorage.store_raw(raw_result, "{}_raw".format(name))
+ self.store_data(raw_result, "raw", stor_prefix, "{}_raw".format(name))
node.conn.fs.unlink(fname)
try:
@@ -222,7 +222,10 @@
step=expected_time_delta,
data=parsed)
- substorage.set_array(parsed, "{}_data".format(name)) # type: ignore
- substorage["{}_meta".format(name)] = res.series[name].meta() # type: ignore
+ self.store_data(parsed, "array", stor_prefix, "{}_data".format(name))
+ self.store_data(res.series[name].meta(), "yaml", stor_prefix, "{}_meta".format(name))
return res
+
+ def format_for_console(self, data: Any) -> str:
+ raise NotImplementedError()
diff --git a/wally/suits/io/fio_task_parser.py b/wally/suits/io/fio_task_parser.py
index 8390e3a..6790c97 100644
--- a/wally/suits/io/fio_task_parser.py
+++ b/wally/suits/io/fio_task_parser.py
@@ -11,6 +11,7 @@
from collections import OrderedDict
+from ...result_classes import IStorable
from ..itest import IterationConfig
from ...utils import sec_to_str, ssize2b
@@ -37,7 +38,9 @@
("vm_count", int)])
-class FioJobSection(IterationConfig):
+class FioJobSection(IterationConfig, IStorable):
+ yaml_tag = 'fio_job'
+
def __init__(self, name: str) -> None:
self.name = name
self.vals = OrderedDict() # type: Dict[str, Any]
@@ -67,6 +70,20 @@
return res
+ def raw(self) -> Dict[str, Any]:
+ return {
+ 'name': self.name,
+ 'vals': list(map(list, self.vals.items())),
+ 'summary': self.summary
+ }
+
+ @classmethod
+ def fromraw(cls, data: Dict[str, Any]) -> 'FioJobSection':
+ obj = cls(data['name'])
+ obj.summary = data['summary']
+ obj.vals.update(data['vals'])
+ return obj
+
class ParseError(ValueError):
def __init__(self, msg: str, fname: str, lineno: int, line_cont:Optional[str] = "") -> None:
diff --git a/wally/suits/io/rrd.cfg b/wally/suits/io/rrd.cfg
index 1075aea..d32d6a8 100644
--- a/wally/suits/io/rrd.cfg
+++ b/wally/suits/io/rrd.cfg
@@ -1,8 +1,14 @@
[global]
include defaults_qd.cfg
-ramp_time=5
-runtime=5
+ramp_time=0
+runtime=4
[test_{TEST_SUMM}]
blocksize=60k
rw=randread
+iodepth=1
+
+[test_{TEST_SUMM}]
+iodepth=16
+blocksize=60k
+rw=randread
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index aae475c..78986f6 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -3,14 +3,15 @@
import logging
import os.path
import datetime
-from typing import Dict, Any, List, Optional, Tuple, cast
+from typing import Dict, Any, List, Optional, Callable, cast
from concurrent.futures import ThreadPoolExecutor, wait
from ..utils import Barrier, StopTestError, sec_to_str
from ..node_interfaces import IRPCNode
from ..storage import Storage
-from ..result_classes import NodeTestResults, TimeSerie
+from ..result_classes import NodeTestResults, IStorable
+from queue import Queue
logger = logging.getLogger("wally")
@@ -45,22 +46,23 @@
self.remote_dir = remote_dir
-class IterationConfig:
+class IterationConfig(IStorable):
name = None # type: str
summary = None # type: str
-class PerfTest:
+class PerfTest(metaclass=abc.ABCMeta):
"""Base class for all tests"""
name = None # type: str
max_retry = 3
retry_time = 30
- def __init__(self, config: TestInputConfig) -> None:
+ def __init__(self, config: TestInputConfig, on_idle: Callable[[], None] = None) -> None:
self.config = config
self.stop_requested = False
self.nodes = self.config.nodes # type: List[IRPCNode]
self.sorted_nodes_ids = sorted(node.info.node_id() for node in self.nodes)
+ self.on_idle = on_idle
def request_stop(self) -> None:
self.stop_requested = True
@@ -85,29 +87,22 @@
max_time_diff = 5
max_rel_time_diff = 0.05
- def __init__(self, config: TestInputConfig) -> None:
- PerfTest.__init__(self, config)
+ def __init__(self, *args, **kwargs) -> None:
+ PerfTest.__init__(self, *args, **kwargs)
self.iterations_configs = [None] # type: List[Optional[IterationConfig]]
+ self.storage_q = Queue() # type: Any
@abc.abstractmethod
def get_expected_runtime(self, iter_cfg: IterationConfig) -> Optional[int]:
pass
def get_not_done_stages(self, storage: Storage) -> Dict[int, IterationConfig]:
- done_stages = [int(name_id.split("_")[1])
- for is_leaf, name_id in storage.list('result')
- if not is_leaf]
- if len(done_stages) == 0:
- start_run_id = 0
- else:
- start_run_id = max(done_stages) + 1
+ not_done = {} # type: Dict[int, IterationConfig]
- not_in_storage = {} # type: Dict[int, IterationConfig]
-
- for run_id, iteration_config in enumerate(self.iterations_configs[start_run_id:], start_run_id):
+ for run_id, iteration_config in enumerate(self.iterations_configs):
info_path = "result/{}/info".format(run_id)
if info_path in storage:
- info = cast(Dict[str, Any], storage[info_path]) # type: Dict[str, Any]
+ info = cast(Dict[str, Any], storage.get(info_path)) # type: Dict[str, Any]
assert isinstance(info, dict), \
"Broken storage at path {}. Expect test info dict, obtain {!r}".format(info_path, info)
@@ -120,7 +115,7 @@
expected_config = {
'name': self.name,
'iteration_name': iter_name,
- 'iteration_config': iteration_config,
+ 'iteration_config': iteration_config.raw(),
'params': self.config.params,
'nodes': self.sorted_nodes_ids
}
@@ -132,8 +127,9 @@
logger.info("Test iteration {} found in storage and will be skipped".format(iter_name))
else:
- not_in_storage[run_id] = iteration_config
- return not_in_storage
+ not_done[run_id] = iteration_config
+
+ return not_done
def run(self) -> None:
not_in_storage = self.get_not_done_stages(self.config.storage)
@@ -162,6 +158,7 @@
iter_name = "Unnamed" if iteration_config is None else iteration_config.name
logger.info("Run test iteration %s", iter_name)
+ current_result_path = "result/{}_{}".format(iteration_config.summary, run_id)
results = [] # type: List[NodeTestResults]
for idx in range(self.max_retry):
logger.debug("Prepare iteration %s", iter_name)
@@ -175,12 +172,8 @@
try:
futures = []
for node in self.nodes:
- path = "result/{}_{}/measurement/{}".format(iteration_config.summary,
- run_id,
- node.info.node_id())
- self.config.storage.clear(path)
- mstorage = self.config.storage.sub_storage(path)
- futures.append(pool.submit(self.run_iteration, node, iteration_config, mstorage))
+ path = "{}/measurement/{}".format(current_result_path, node.info.node_id())
+ futures.append(pool.submit(self.run_iteration, node, iteration_config, path))
results = [fut.result() for fut in futures]
break
@@ -196,11 +189,7 @@
stop_times = [] # type: List[int]
# TODO: FIX result processing - NodeTestResults
- result = None # type: NodeTestResults
for result in results:
- mstorage = self.config.storage.sub_storage("result/{}_{}/measurement/{}"
- .format(result.summary, run_id, result.node_id))
- serie = None # type: TimeSerie
for name, serie in result.series.items():
start_times.append(serie.start_at)
stop_times.append(serie.step * len(serie.data))
@@ -224,14 +213,43 @@
test_config = {
'name': self.name,
'iteration_name': iter_name,
- 'iteration_config': iteration_config,
+ 'iteration_config': iteration_config.raw(),
'params': self.config.params,
'nodes': self.sorted_nodes_ids,
'begin_time': min_start_time,
'end_time': max_stop_time
}
- self.config.storage["result", str(run_id), "info"] = test_config # type: ignore
+ self.process_storage_queue()
+ self.config.storage.put(test_config, "result", str(run_id), "info")
+
+ if "all_results" in self.config.storage:
+ all_results = self.config.storage.get("all_results")
+ else:
+ all_results = []
+
+ all_results.append([self.name, iteration_config.summary, current_result_path])
+ self.config.storage.put(all_results, "all_results")
+ self.config.storage.sync()
+
+ if self.on_idle is not None:
+ self.on_idle()
+
+ def store_data(self, val: Any, type: str, prefix: str, *path: str) -> None:
+ self.storage_q.put((val, type, prefix, path))
+
+ def process_storage_queue(self) -> None:
+ while not self.storage_q.empty():
+ value, val_type, subpath, val_path = self.storage_q.get()
+ if val_type == 'raw':
+ self.config.storage.put_raw(value, subpath, *val_path)
+ elif val_type == 'yaml':
+ self.config.storage.put(value, subpath, *val_path)
+ elif val_type == 'array':
+ self.config.storage.put_array(value, subpath, *val_path)
+ else:
+ logger.error("Internal logic error - unknown data stop type {!r}".format(val_path))
+ raise StopTestError()
@abc.abstractmethod
def config_node(self, node: IRPCNode) -> None:
@@ -242,7 +260,7 @@
pass
@abc.abstractmethod
- def run_iteration(self, node: IRPCNode, iter_config: IterationConfig, substorage: Storage) -> NodeTestResults:
+ def run_iteration(self, node: IRPCNode, iter_config: IterationConfig, stor_prefix: str) -> NodeTestResults:
pass
@@ -269,7 +287,7 @@
def prepare_iteration(self, node: IRPCNode, iter_config: IterationConfig) -> None:
pass
- def run_iteration(self, node: IRPCNode, iter_config: IterationConfig, substorage: Storage) -> NodeTestResults:
+ def run_iteration(self, node: IRPCNode, iter_config: IterationConfig, stor_prefix: str) -> NodeTestResults:
# TODO: have to store logs
cmd = self.join_remote(self.run_script)
cmd += ' ' + self.config.params.get('run_opts', '')