test code working
diff --git a/wally/suits/io/defaults_qd.cfg b/wally/suits/io/defaults_qd.cfg
index 0418e8a..fcf5c16 100644
--- a/wally/suits/io/defaults_qd.cfg
+++ b/wally/suits/io/defaults_qd.cfg
@@ -19,7 +19,7 @@
size={FILESIZE}
write_iops_log=fio_iops_log
-write_bw_log=fio_ibw_log
+write_bw_log=fio_bw_log
log_avg_msec=1000
write_hist_log=fio_lat_hist_log
log_hist_msec=1000
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
index e055d98..ee34af4 100644
--- a/wally/suits/io/fio.py
+++ b/wally/suits/io/fio.py
@@ -1,14 +1,18 @@
+import array
import os.path
import logging
-from typing import cast
+from typing import cast, Dict
import wally
from ...utils import StopTestError, get_os, ssize2b
from ...node_interfaces import IRPCNode
-from ..itest import ThreadedTest, IterationConfig, RunTestRes
-from .fio_task_parser import execution_time, fio_cfg_compile, FioJobSection, FioParams, get_log_files
+from ..itest import ThreadedTest, IterationConfig, NodeTestResults
+from ...result_classes import TimeSerie
+from .fio_task_parser import execution_time, fio_cfg_compile, FioJobSection, FioParams, get_log_files, get_test_summary
from . import rpc_plugin
+from .fio_hist import expected_lat_bins
+from ...storage import Storage
logger = logging.getLogger("wally")
@@ -79,8 +83,8 @@
self.exec_folder = self.config.remote_dir
def config_node(self, node: IRPCNode) -> None:
- plugin_code = open(rpc_plugin.__file__.rsplit(".", 1)[0] + ".py", "rb").read()
- node.upload_plugin(code=plugin_code, name="fio")
+ plugin_code = open(rpc_plugin.__file__.rsplit(".", 1)[0] + ".py", "rb").read() # type: bytes
+ node.upload_plugin("fio", plugin_code)
try:
node.conn.fs.rmtree(self.config.remote_dir)
@@ -102,9 +106,6 @@
if fill_bw is not None:
logger.info("Initial fio fill bw is {} MiBps for {}".format(fill_bw, node))
- fio_config = "\n".join(map(str, self.iterations_configs))
- node.put_to_file(self.remote_task_file, fio_config.encode("utf8"))
-
def install_utils(self, node: IRPCNode) -> None:
os_info = get_os(node)
if self.use_system_fio:
@@ -131,22 +132,97 @@
def get_expected_runtime(self, iteration_info: IterationConfig) -> int:
return execution_time(cast(FioJobSection, iteration_info))
- def do_test(self, node: IRPCNode, iter_config: IterationConfig) -> RunTestRes:
+ def prepare_iteration(self, node: IRPCNode, iter_config: IterationConfig) -> None:
+ node.put_to_file(self.remote_task_file, str(iter_config).encode("utf8"))
+
+ # TODO: get a link to substorage as a parameter
+ def run_iteration(self, node: IRPCNode, iter_config: IterationConfig, substorage: Storage) -> NodeTestResults:
exec_time = execution_time(cast(FioJobSection, iter_config))
+
fio_cmd_templ = "cd {exec_folder}; " + \
"{fio_path} --output-format=json --output={out_file} --alloc-size=262144 {job_file}"
- bw_log, iops_log, lat_hist_log = get_log_files(iter_config)
-
cmd = fio_cmd_templ.format(exec_folder=self.exec_folder,
fio_path=self.fio_path,
out_file=self.remote_output_file,
job_file=self.remote_task_file)
- raw_res = node.run(cmd, timeout=exec_time + max(300, exec_time))
-
- return
+ must_be_empty = node.run(cmd, timeout=exec_time + max(300, exec_time), check_timeout=1).strip()
- # TODO(koder): fix next error
- # raise NotImplementedError("Need to extract time from test result")
- # return raw_res, (0, 0)
+ if must_be_empty:
+ logger.error("Unexpected fio output: %r", must_be_empty)
+ res = NodeTestResults(self.__class__.__name__,
+ node.info.node_id(),
+ get_test_summary(cast(FioJobSection, iter_config)))
+
+ res.extra_logs['fio'] = node.get_file_content(self.remote_output_file)
+ substorage.store_raw(res.extra_logs['fio'], "fio_raw")
+ node.conn.fs.unlink(self.remote_output_file)
+
+ files = [name for name in node.conn.fs.listdir(self.exec_folder)]
+
+ expected_time_delta = 1000 # 1000ms == 1s
+ max_time_diff = 50 # 50ms - 5%
+
+ for name, path in get_log_files(cast(FioJobSection, iter_config)):
+ log_files = [fname for fname in files if fname.startswith(path)]
+ if len(log_files) != 1:
+ logger.error("Found %s files, match log pattern %s(%s) - %s",
+ len(log_files), path, name, ",".join(log_files[10:]))
+ raise StopTestError()
+
+ fname = os.path.join(self.exec_folder, log_files[0])
+ raw_result = node.get_file_content(fname) # type: bytes
+ substorage.store_raw(raw_result, "{}_raw".format(name))
+ node.conn.fs.unlink(fname)
+
+ try:
+ log_data = raw_result.decode("utf8").split("\n")
+ except UnicodeEncodeError:
+ logger.exception("Error during parse %s fio log file - can't decode usint UTF8", name)
+ raise StopTestError()
+
+ parsed = array.array('L' if name == 'lat' else 'Q')
+ prev_ts = None
+ load_start_at = None
+
+ # TODO: need to adjust vals for timedelta
+ for idx, line in enumerate(log_data):
+ line = line.strip()
+ if line:
+ try:
+ time_ms_s, val_s, _, *rest = line.split(",")
+ time_ms = int(time_ms_s.strip())
+ if prev_ts and abs(time_ms - prev_ts - expected_time_delta) > max_time_diff:
+ logger.warning("Too large gap in {} log at {} - {}ms"
+ .format(time_ms, name, time_ms - prev_ts))
+ else:
+ prev_ts = time_ms - expected_time_delta
+ load_start_at = time_ms
+ if name == 'lat':
+ vals = [int(i.strip()) for i in rest]
+
+ if len(vals) != expected_lat_bins:
+ logger.error("Expect {} bins in latency histogram, but found {} at time {}"
+ .format(expected_lat_bins, len(vals), time_ms_s))
+ raise StopTestError()
+
+ parsed.extend(vals)
+ else:
+ parsed.append(int(val_s.strip()))
+ except ValueError:
+ logger.exception("Error during parse %s fio log file in line %s: %r", name, idx, line)
+ raise StopTestError()
+ prev_ts += expected_time_delta
+
+ res.series[name] = TimeSerie(name=name,
+ raw=raw_result,
+ second_axis_size=expected_lat_bins if name == 'lat' else 1,
+ start_at=load_start_at,
+ step=expected_time_delta,
+ data=parsed)
+
+ substorage.set_array(parsed, "{}_data".format(name)) # type: ignore
+ substorage["{}_meta".format(name)] = res.series[name].meta() # type: ignore
+
+ return res
diff --git a/wally/suits/io/fio_hist.py b/wally/suits/io/fio_hist.py
new file mode 100644
index 0000000..eb5d9ee
--- /dev/null
+++ b/wally/suits/io/fio_hist.py
@@ -0,0 +1,57 @@
+from typing import List
+
+
+expected_lat_bins = 1216
+
+
+#---------------------------- FIO HIST LOG PARSE CODE -----------------------------------------------------------------
+
+# Copy-paste from fio/tools/hist/fiologparser_hist.py.
+# Because that's impossible to understand or improve,
+# you can only copy such a pearl.
+
+def _plat_idx_to_val(idx: int , edge: float = 0.5, FIO_IO_U_PLAT_BITS: int = 6, FIO_IO_U_PLAT_VAL: int = 64) -> float:
+ """ Taken from fio's stat.c for calculating the latency value of a bin
+ from that bin's index.
+
+ idx : the value of the index into the histogram bins
+ edge : fractional value in the range [0,1]** indicating how far into
+ the bin we wish to compute the latency value of.
+
+ ** edge = 0.0 and 1.0 computes the lower and upper latency bounds
+ respectively of the given bin index. """
+
+ # MSB <= (FIO_IO_U_PLAT_BITS-1), cannot be rounded off. Use
+ # all bits of the sample as index
+ if (idx < (FIO_IO_U_PLAT_VAL << 1)):
+ return idx
+
+ # Find the group and compute the minimum value of that group
+ error_bits = (idx >> FIO_IO_U_PLAT_BITS) - 1
+ base = 1 << (error_bits + FIO_IO_U_PLAT_BITS)
+
+ # Find its bucket number of the group
+ k = idx % FIO_IO_U_PLAT_VAL
+
+ # Return the mean (if edge=0.5) of the range of the bucket
+ return base + ((k + edge) * (1 << error_bits))
+
+
+def plat_idx_to_val_coarse(idx: int, coarseness: int, edge: float = 0.5) -> float:
+ """ Converts the given *coarse* index into a non-coarse index as used by fio
+ in stat.h:plat_idx_to_val(), subsequently computing the appropriate
+ latency value for that bin.
+ """
+
+ # Multiply the index by the power of 2 coarseness to get the bin
+ # bin index with a max of 1536 bins (FIO_IO_U_PLAT_GROUP_NR = 24 in stat.h)
+ stride = 1 << coarseness
+ idx = idx * stride
+ lower = _plat_idx_to_val(idx, edge=0.0)
+ upper = _plat_idx_to_val(idx + stride, edge=1.0)
+ return lower + (upper - lower) * edge
+
+
+def get_lat_vals(columns: int = 1216, coarseness: int = 0) -> List[float]:
+ return [plat_idx_to_val_coarse(val, coarseness) for val in range(columns)]
+
diff --git a/wally/suits/io/fio_task_parser.py b/wally/suits/io/fio_task_parser.py
index aaf4b36..8390e3a 100644
--- a/wally/suits/io/fio_task_parser.py
+++ b/wally/suits/io/fio_task_parser.py
@@ -41,6 +41,7 @@
def __init__(self, name: str) -> None:
self.name = name
self.vals = OrderedDict() # type: Dict[str, Any]
+ self.summary = None
def copy(self) -> 'FioJobSection':
return copy.deepcopy(self)
@@ -398,15 +399,22 @@
yield res
-def get_log_files(sec: FioJobSection) -> Tuple[Optional[str], Optional[str], Optional[str]]:
- return sec.vals.get('write_iops_log'), sec.vals.get('write_bw_log'), sec.vals.get('write_hist_log')
+def get_log_files(sec: FioJobSection) -> List[Tuple[str, str]]:
+ res = [] # type: List[Tuple[str, str]]
+ for key, name in (('write_iops_log', 'iops'), ('write_bw_log', 'bw'), ('write_hist_log', 'lat')):
+ log = sec.vals.get(key)
+ if log is not None:
+ res.append((name, log))
+ return res
def fio_cfg_compile(source: str, fname: str, test_params: FioParams) -> Iterator[FioJobSection]:
it = parse_all_in_1(source, fname)
it = (apply_params(sec, test_params) for sec in it)
it = flatmap(process_cycles, it)
- return map(final_process, it)
+ for sec in map(final_process, it):
+ sec.summary = get_test_summary(sec)
+ yield sec
def parse_args(argv):
diff --git a/wally/suits/io/rpc_plugin.py b/wally/suits/io/rpc_plugin.py
index 306af28..98e55f0 100644
--- a/wally/suits/io/rpc_plugin.py
+++ b/wally/suits/io/rpc_plugin.py
@@ -11,7 +11,6 @@
logger = logging.getLogger("agent.fio")
-SensorsMap = {}
def check_file_prefilled(path, used_size_mb):
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index f328e13..aae475c 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -5,12 +5,12 @@
import datetime
from typing import Dict, Any, List, Optional, Tuple, cast
-from concurrent.futures import ThreadPoolExecutor
+from concurrent.futures import ThreadPoolExecutor, wait
from ..utils import Barrier, StopTestError, sec_to_str
from ..node_interfaces import IRPCNode
from ..storage import Storage
-from ..result_classes import RawTestResults
+from ..result_classes import NodeTestResults, TimeSerie
logger = logging.getLogger("wally")
@@ -47,6 +47,7 @@
class IterationConfig:
name = None # type: str
+ summary = None # type: str
class PerfTest:
@@ -76,9 +77,6 @@
pass
-RunTestRes = Tuple[RawTestResults, Tuple[int, int]]
-
-
class ThreadedTest(PerfTest, metaclass=abc.ABCMeta):
"""Base class for tests, which spawn separated thread for each node"""
@@ -96,11 +94,13 @@
pass
def get_not_done_stages(self, storage: Storage) -> Dict[int, IterationConfig]:
- done_stages = list(storage.list('result'))
+ done_stages = [int(name_id.split("_")[1])
+ for is_leaf, name_id in storage.list('result')
+ if not is_leaf]
if len(done_stages) == 0:
start_run_id = 0
else:
- start_run_id = max(int(name) for _, name in done_stages) + 1
+ start_run_id = max(done_stages) + 1
not_in_storage = {} # type: Dict[int, IterationConfig]
@@ -143,10 +143,7 @@
return
logger.debug("Run test {} on nodes {}.".format(self.name, ",".join(self.sorted_nodes_ids)))
-
- barrier = Barrier(len(self.nodes))
-
- logger.debug("Run preparation")
+ logger.debug("Prepare nodes")
with ThreadPoolExecutor(len(self.nodes)) as pool:
list(pool.map(self.config_node, self.nodes))
@@ -163,31 +160,50 @@
for run_id, iteration_config in sorted(not_in_storage.items()):
iter_name = "Unnamed" if iteration_config is None else iteration_config.name
- logger.info("Run test iteration {} ".format(iter_name))
+ logger.info("Run test iteration %s", iter_name)
- results = [] # type: List[RunTestRes]
+ results = [] # type: List[NodeTestResults]
for idx in range(self.max_retry):
- barrier.wait()
- try:
- futures = [pool.submit(self.do_test, node, iteration_config) for node in self.nodes]
- results = [fut.result() for fut in futures]
- except EnvironmentError as exc:
- if self.max_retry - 1 == idx:
- raise StopTestError("Fio failed") from exc
- logger.exception("During fio run")
+ logger.debug("Prepare iteration %s", iter_name)
- logger.info("Sleeping %ss and retrying", self.retry_time)
- time.sleep(self.retry_time)
+ # prepare nodes for new iterations
+ futures = [pool.submit(self.prepare_iteration, node, iteration_config) for node in self.nodes]
+ wait(futures)
+
+ # run iteration
+ logger.debug("Run iteration %s", iter_name)
+ try:
+ futures = []
+ for node in self.nodes:
+ path = "result/{}_{}/measurement/{}".format(iteration_config.summary,
+ run_id,
+ node.info.node_id())
+ self.config.storage.clear(path)
+ mstorage = self.config.storage.sub_storage(path)
+ futures.append(pool.submit(self.run_iteration, node, iteration_config, mstorage))
+
+ results = [fut.result() for fut in futures]
+ break
+ except EnvironmentError:
+ if self.max_retry - 1 == idx:
+ logger.exception("Fio failed")
+ raise StopTestError()
+ logger.exception("During fio run")
+ logger.info("Sleeping %ss and retrying", self.retry_time)
+ time.sleep(self.retry_time)
start_times = [] # type: List[int]
stop_times = [] # type: List[int]
- mstorage = self.config.storage.sub_storage("result", str(run_id), "measurement")
- for (result, (t_start, t_stop)), node in zip(results, self.config.nodes):
- for metrics_name, data in result.items():
- mstorage[node.info.node_id(), metrics_name] = data # type: ignore
- start_times.append(t_start)
- stop_times.append(t_stop)
+ # TODO: FIX result processing - NodeTestResults
+ result = None # type: NodeTestResults
+ for result in results:
+ mstorage = self.config.storage.sub_storage("result/{}_{}/measurement/{}"
+ .format(result.summary, run_id, result.node_id))
+ serie = None # type: TimeSerie
+ for name, serie in result.series.items():
+ start_times.append(serie.start_at)
+ stop_times.append(serie.step * len(serie.data))
min_start_time = min(start_times)
max_start_time = max(start_times)
@@ -222,7 +238,11 @@
pass
@abc.abstractmethod
- def do_test(self, node: IRPCNode, iter_config: IterationConfig) -> RunTestRes:
+ def prepare_iteration(self, node: IRPCNode, iter_config: IterationConfig) -> None:
+ pass
+
+ @abc.abstractmethod
+ def run_iteration(self, node: IRPCNode, iter_config: IterationConfig, substorage: Storage) -> NodeTestResults:
pass
@@ -246,15 +266,16 @@
cmd += ' ' + self.config.params.get('prerun_opts', '')
node.run(cmd, timeout=self.prerun_tout)
- def do_test(self, node: IRPCNode, iter_config: IterationConfig) -> RunTestRes:
+ def prepare_iteration(self, node: IRPCNode, iter_config: IterationConfig) -> None:
+ pass
+
+ def run_iteration(self, node: IRPCNode, iter_config: IterationConfig, substorage: Storage) -> NodeTestResults:
+ # TODO: have to store logs
cmd = self.join_remote(self.run_script)
cmd += ' ' + self.config.params.get('run_opts', '')
- t1 = time.time()
- res = self.parse_results(node.run(cmd, timeout=self.run_tout))
- t2 = time.time()
- return res, (int(t1), int(t2))
+ return self.parse_results(node.run(cmd, timeout=self.run_tout))
@abc.abstractmethod
- def parse_results(self, data: str) -> RawTestResults:
+ def parse_results(self, data: str) -> NodeTestResults:
pass