move even more code to cephlib
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
index 1895b6c..374e5a4 100644
--- a/wally/suits/io/fio.py
+++ b/wally/suits/io/fio.py
@@ -1,13 +1,14 @@
import os.path
import logging
-from typing import cast, Any, List, Union
+from typing import cast, Any, List, Union, Tuple, Optional
import numpy
+from cephlib.units import ssize2b, b2ssize
+from cephlib.node import IRPCNode, get_os
+
import wally
-from ...utils import StopTestError, ssize2b, b2ssize
-from ...node_interfaces import IRPCNode
-from ...node_utils import get_os
+from ...utils import StopTestError
from ..itest import ThreadedTest
from ...result_classes import TimeSeries, DataSource
from ..job import JobConfig
@@ -189,7 +190,7 @@
node.conn.fs.unlink(self.remote_output_file)
files = [name for name in node.conn.fs.listdir(self.exec_folder)]
- result = []
+ result = [] # type: List[TimeSeries]
for name, file_path, units in get_log_files(cast(FioJobConfig, job)):
log_files = [fname for fname in files if fname.startswith(file_path)]
if len(log_files) != 1:
@@ -235,19 +236,16 @@
logger.exception("Error during parse %s fio log file in line %s: %r", name, idx, line)
raise StopTestError()
- if not self.suite.keep_raw_files:
- raw_result = None
+ assert not self.suite.keep_raw_files, "keep_raw_files is not supported"
histo_bins = None if name != 'lat' else numpy.array(get_lat_vals())
-
- result.append(TimeSeries(name=name,
- raw=raw_result,
- data=numpy.array(parsed, dtype='uint64'),
- units=units,
- times=numpy.array(times, dtype='uint64'),
- time_units='ms',
- source=path(metric=name, tag='csv'),
- histo_bins=histo_bins))
+ ts = TimeSeries(data=numpy.array(parsed, dtype='uint64'),
+ units=units,
+ times=numpy.array(times, dtype='uint64'),
+ time_units='ms',
+ source=path(metric=name, tag='csv'),
+ histo_bins=histo_bins)
+ result.append(ts)
return result
def format_for_console(self, data: Any) -> str:
diff --git a/wally/suits/io/fio_job.py b/wally/suits/io/fio_job.py
index 39715ef..3d4e886 100644
--- a/wally/suits/io/fio_job.py
+++ b/wally/suits/io/fio_job.py
@@ -1,10 +1,9 @@
-import abc
import copy
from collections import OrderedDict
from typing import Optional, Iterator, Union, Dict, Tuple, NamedTuple, Any, cast
+from cephlib.units import ssize2b, b2ssize
-from ...utils import ssize2b, b2ssize
from ..job import JobConfig, JobParams
diff --git a/wally/suits/io/fio_task_parser.py b/wally/suits/io/fio_task_parser.py
index 222589b..9ffeed8 100644
--- a/wally/suits/io/fio_task_parser.py
+++ b/wally/suits/io/fio_task_parser.py
@@ -9,8 +9,9 @@
from typing import Optional, Iterator, Union, Dict, Iterable, List, Tuple, NamedTuple, Any
from collections import OrderedDict
+from cephlib.units import ssize2b
+from cephlib.common import flatmap, sec_to_str
-from ...utils import sec_to_str, ssize2b, flatmap
from .fio_job import Var, FioJobConfig
SECTION = 0
@@ -87,16 +88,12 @@
fname, lineno, oline)
if line.startswith('['):
- yield CfgLine(fname, lineno, oline, SECTION,
- line[1:-1].strip(), None)
+ yield CfgLine(fname, lineno, oline, SECTION, line[1:-1].strip(), None)
elif '=' in line:
opt_name, opt_val = line.split('=', 1)
- yield CfgLine(fname, lineno, oline, SETTING,
- opt_name.strip(),
- parse_value(opt_val.strip()))
+ yield CfgLine(fname, lineno, oline, SETTING, opt_name.strip(), parse_value(opt_val.strip()))
elif line.startswith("include "):
- yield CfgLine(fname, lineno, oline, INCLUDE,
- line.split(" ", 1)[1], None)
+ yield CfgLine(fname, lineno, oline, INCLUDE, line.split(" ", 1)[1], None)
else:
yield CfgLine(fname, lineno, oline, SETTING, line, '1')
diff --git a/wally/suits/io/rpc_plugin.py b/wally/suits/io/rpc_plugin.py
index 39ed5cc..1c7fee0 100644
--- a/wally/suits/io/rpc_plugin.py
+++ b/wally/suits/io/rpc_plugin.py
@@ -42,10 +42,13 @@
assert size % 4 == 0, "File size must be proportional to 4M"
- cmd_templ = "{} --name=xxx --filename={} --direct=1 --bs=4m --size={}m --rw=write"
+ cmd_templ = "{0} --name=xxx --filename={1} --direct=1 --bs=4m --size={2}m --rw=write"
run_time = time.time()
- subprocess.check_output(cmd_templ.format(fio_path, fname, size), shell=True)
+ try:
+ subprocess.check_output(cmd_templ.format(fio_path, fname, size), shell=True)
+ except subprocess.CalledProcessError as exc:
+ raise RuntimeError("{0!s}.\nOutput: {1}".format(exc, exc.output))
run_time = time.time() - run_time
prefill_bw = None if run_time < 1.0 else int(size / run_time)
@@ -55,6 +58,6 @@
def rpc_install(name, binary):
try:
- subprocess.check_output("which {}".format(binary), shell=True)
+ subprocess.check_output("which {0}".format(binary), shell=True)
except:
- subprocess.check_output("apt-get install -y {}".format(name), shell=True)
+ subprocess.check_output("apt-get install -y {0}".format(name), shell=True)
diff --git a/wally/suits/io/rrd.cfg b/wally/suits/io/rrd.cfg
index ae2d960..b013796 100644
--- a/wally/suits/io/rrd.cfg
+++ b/wally/suits/io/rrd.cfg
@@ -8,27 +8,27 @@
rw=randread
iodepth=1
-[test_{TEST_SUMM}]
-iodepth=16
-blocksize=60k
-rw=randread
+#[test_{TEST_SUMM}]
+#iodepth=16
+#blocksize=60k
+#rw=randread
-[test_{TEST_SUMM}]
-blocksize=60k
-rw=randwrite
-iodepth=1
+#[test_{TEST_SUMM}]
+#blocksize=60k
+#rw=randwrite
+#iodepth=1
-[test_{TEST_SUMM}]
-iodepth=16
-blocksize=60k
-rw=randwrite
+#[test_{TEST_SUMM}]
+#iodepth=16
+#blocksize=60k
+#rw=randwrite
-[test_{TEST_SUMM}]
-iodepth=1
-blocksize=1m
-rw=write
+#[test_{TEST_SUMM}]
+#iodepth=1
+#blocksize=1m
+#rw=write
-[test_{TEST_SUMM}]
-iodepth=1
-blocksize=1m
-rw=read
+#[test_{TEST_SUMM}]
+#iodepth=1
+#blocksize=1m
+#rw=read
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index e848442..b2b3a54 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -2,13 +2,14 @@
import time
import logging
import os.path
-from typing import Any, List, Optional, Callable, Iterable, cast
+from typing import Any, List, Optional, Callable, Iterable, cast, Tuple
from concurrent.futures import ThreadPoolExecutor, wait
+from cephlib.node import IRPCNode
+
from ..utils import StopTestError, get_time_interval_printable_info
-from ..node_interfaces import IRPCNode
-from ..result_classes import SuiteConfig, JobConfig, TimeSeries, IResultStorage
+from ..result_classes import SuiteConfig, JobConfig, TimeSeries, IWallyStorage
logger = logging.getLogger("wally")
@@ -24,7 +25,7 @@
retry_time = 30
job_config_cls = None # type: type
- def __init__(self, storage: IResultStorage, suite: SuiteConfig, on_idle: Callable[[], None] = None) -> None:
+ def __init__(self, storage: IWallyStorage, suite: SuiteConfig, on_idle: Callable[[], None] = None) -> None:
self.suite = suite
self.stop_requested = False
self.sorted_nodes_ids = sorted(node.node_id for node in self.suite.nodes)
diff --git a/wally/suits/job.py b/wally/suits/job.py
index 5f3c764..8ef1093 100644
--- a/wally/suits/job.py
+++ b/wally/suits/job.py
@@ -2,7 +2,7 @@
from typing import Dict, Any, Tuple, cast, Union
from collections import OrderedDict
-from cephlib.storage import Storable
+from cephlib.istorage import Storable
class JobParams(metaclass=abc.ABCMeta):