2.0 refactoring:
* Add type for most of functions
* Remove old fio run code, move to RPC/pluggable
* Remove most of sensors code, will move then to RPC
* Other refactoring
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 0529c1f..00492c9 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -3,18 +3,21 @@
import logging
import os.path
import functools
+from typing import Dict, Any, List, Tuple
from concurrent.futures import ThreadPoolExecutor
-from wally.utils import Barrier, StopTestError
-from wally.statistic import data_property
-from wally.ssh_utils import run_over_ssh, copy_paths
+from ..utils import Barrier, StopTestError
+from ..statistic import data_property
+from ..ssh_utils import copy_paths
+from ..inode import INode
+
logger = logging.getLogger("wally")
-class TestConfig(object):
+class TestConfig:
"""
this class describe test input configuration
@@ -25,8 +28,13 @@
nodes:[Node] - node to run tests on
remote_dir:str - directory on nodes to be used for local files
"""
- def __init__(self, test_type, params, test_uuid, nodes,
- log_directory, remote_dir):
+ def __init__(self,
+ test_type: str,
+ params: Dict[str, Any],
+ test_uuid: str,
+ nodes: List[INode],
+ log_directory: str,
+ remote_dir: str):
self.test_type = test_type
self.params = params
self.test_uuid = test_uuid
@@ -35,7 +43,7 @@
self.remote_dir = remote_dir
-class TestResults(object):
+class TestResults:
"""
this class describe test results
@@ -45,7 +53,11 @@
raw_result:Any - opaque object to store raw results
run_interval:(float, float) - test tun time, used for sensors
"""
- def __init__(self, config, results, raw_result, run_interval):
+ def __init__(self,
+ config: TestConfig,
+ results: Dict[str, Any],
+ raw_result: Any,
+ run_interval: Tuple[float, float]):
self.config = config
self.params = config.params
self.results = results
@@ -76,22 +88,22 @@
pass
-class MeasurementMatrix(object):
- """
- data:[[MeasurementResult]] - VM_COUNT x TH_COUNT matrix of MeasurementResult
- """
- def __init__(self, data, connections_ids):
- self.data = data
- self.connections_ids = connections_ids
-
- def per_vm(self):
- return self.data
-
- def per_th(self):
- return sum(self.data, [])
+# class MeasurementMatrix:
+# """
+# data:[[MeasurementResult]] - VM_COUNT x TH_COUNT matrix of MeasurementResult
+# """
+# def __init__(self, data, connections_ids):
+# self.data = data
+# self.connections_ids = connections_ids
+#
+# def per_vm(self):
+# return self.data
+#
+# def per_th(self):
+# return sum(self.data, [])
-class MeasurementResults(object):
+class MeasurementResults:
def stat(self):
return data_property(self.data)
@@ -112,7 +124,7 @@
data:[(float, float, float)] - list of (start_time, lenght, average_value_for_interval)
odata: original values
"""
- def __init__(self, data):
+ def __init__(self, data: List[Tuple[float, float, float]]):
assert len(data) > 0
self.odata = data[:]
self.data = []
@@ -123,13 +135,13 @@
cstart = nstart
@property
- def values(self):
+ def values(self) -> List[float]:
return [val[2] for val in self.data]
- def average_interval(self):
+ def average_interval(self) -> float:
return float(sum([val[1] for val in self.data])) / len(self.data)
- def skip(self, seconds):
+ def skip(self, seconds) -> 'TimeSeriesValue':
nres = []
for start, ln, val in self.data:
nstart = start + ln - seconds
@@ -137,7 +149,7 @@
nres.append([nstart, val])
return self.__class__(nres)
- def derived(self, tdelta):
+ def derived(self, tdelta) -> 'TimeSeriesValue':
end = self.data[-1][0] + self.data[-1][1]
tdelta = float(tdelta)
@@ -166,7 +178,7 @@
return self.__class__(res)
-class PerfTest(object):
+class PerfTest:
"""
Very base class for tests
config:TestConfig - test configuration
@@ -176,41 +188,32 @@
self.config = config
self.stop_requested = False
- def request_stop(self):
+ def request_stop(self) -> None:
self.stop_requested = True
- def join_remote(self, path):
+ def join_remote(self, path: str) -> str:
return os.path.join(self.config.remote_dir, path)
@classmethod
@abc.abstractmethod
- def load(cls, path):
+ def load(cls, path: str):
pass
@abc.abstractmethod
- def run(self):
+ def run(self) -> List[TestResults]:
pass
@abc.abstractmethod
- def format_for_console(cls, data):
+ def format_for_console(cls, data: Any) -> str:
pass
-def run_on_node(node):
- def closure(*args, **kwargs):
- return run_over_ssh(node.connection,
- *args,
- node=node.get_conn_id(),
- **kwargs)
- return closure
-
-
class ThreadedTest(PerfTest):
"""
Base class for tests, which spawn separated thread for each node
"""
- def run(self):
+ def run(self) -> List[TestResults]:
barrier = Barrier(len(self.config.nodes))
th_test_func = functools.partial(self.th_test_func, barrier)
@@ -218,44 +221,27 @@
return list(pool.map(th_test_func, self.config.nodes))
@abc.abstractmethod
- def do_test(self, node):
+ def do_test(self, node: INode) -> TestResults:
pass
- def th_test_func(self, barrier, node):
- logger.debug("Starting {0} test on {1} node".format(self.__class__.__name__,
- node.conn_url))
-
- logger.debug("Run preparation for {0}".format(node.get_conn_id()))
+ def th_test_func(self, barrier: Barrier, node: INode) -> TestResults:
+ test_name = self.__class__.__name__
+ logger.debug("Starting {} test on {}".format(test_name , node))
+ logger.debug("Run test preparation on {}".format(node))
self.pre_run(node)
+
+ # wait till all thread became ready
barrier.wait()
+
+ logger.debug("Run test on {}".format(node))
try:
- logger.debug("Run test for {0}".format(node.get_conn_id()))
return self.do_test(node)
- except StopTestError as exc:
- pass
except Exception as exc:
- msg = "In test {0} for node {1}".format(self, node.get_conn_id())
+ msg = "In test {} for {}".format(test_name, node)
logger.exception(msg)
- exc = StopTestError(msg, exc)
+ raise StopTestError(msg) from exc
- try:
- self.cleanup()
- except StopTestError as exc1:
- if exc is None:
- exc = exc1
- except Exception as exc1:
- if exc is None:
- msg = "Duringf cleanup - in test {0} for node {1}".format(self, node)
- logger.exception(msg)
- exc = StopTestError(msg, exc)
-
- if exc is not None:
- raise exc
-
- def pre_run(self, node):
- pass
-
- def cleanup(self, node):
+ def pre_run(self, node: INode) -> None:
pass
@@ -269,25 +255,22 @@
self.prerun_tout = self.config.params.get('prerun_tout', 3600)
self.run_tout = self.config.params.get('run_tout', 3600)
- def get_remote_for_script(self, script):
- return os.path.join(self.remote_dir,
- os.path.basename(script))
+ def get_remote_for_script(self, script: str) -> str:
+ return os.path.join(self.remote_dir, os.path.basename(script))
- def pre_run(self, node):
+ def pre_run(self, node: INode) -> None:
copy_paths(node.connection,
- {
- self.run_script: self.get_remote_for_script(self.run_script),
- self.prerun_script: self.get_remote_for_script(self.prerun_script),
- })
+ {self.run_script: self.get_remote_for_script(self.run_script),
+ self.prerun_script: self.get_remote_for_script(self.prerun_script)})
cmd = self.get_remote_for_script(self.prerun_script)
cmd += ' ' + self.config.params.get('prerun_opts', '')
- run_on_node(node)(cmd, timeout=self.prerun_tout)
+ node.run(cmd, timeout=self.prerun_tout)
- def do_test(self, node):
+ def do_test(self, node: INode) -> TestResults:
cmd = self.get_remote_for_script(self.run_script)
cmd += ' ' + self.config.params.get('run_opts', '')
t1 = time.time()
- res = run_on_node(node)(cmd, timeout=self.run_tout)
+ res = node.run(cmd, timeout=self.run_tout)
t2 = time.time()
return TestResults(self.config, None, res, (t1, t2))