2.0 is on the way
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 7004b8e..6d1eeee 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -2,38 +2,43 @@
import time
import logging
import os.path
-import functools
-from typing import Dict, Any, List, Tuple
+import datetime
+from typing import Dict, Any, List, Optional, Tuple, cast
from concurrent.futures import ThreadPoolExecutor
-from ..utils import Barrier, StopTestError
-from ..statistic import data_property
-from ..inode import INode
+from ..utils import Barrier, StopTestError, sec_to_str
+from ..node_interfaces import IRPCNode
from ..storage import Storage
+from ..result_classes import RawTestResults
+
+import agent
logger = logging.getLogger("wally")
-class TestConfig:
+__doc__ = "Contains base classes for performance tests"
+
+
+class TestInputConfig:
"""
this class describe test input configuration
- test_type:str - test type name
- params:{str:Any} - parameters from yaml file for this test
- test_uuid:str - UUID to be used to create filenames and Co
- log_directory:str - local directory to store results
- nodes:[Node] - node to run tests on
- remote_dir:str - directory on nodes to be used for local files
+ test_type - test type name
+ params - parameters from yaml file for this test
+ test_uuid - UUID to be used to create file names & Co
+ log_directory - local directory to store results
+ nodes - nodes to run tests on
+ remote_dir - directory on nodes to be used for local files
"""
def __init__(self,
test_type: str,
params: Dict[str, Any],
run_uuid: str,
- nodes: List[INode],
+ nodes: List[IRPCNode],
storage: Storage,
- remote_dir: str):
+ remote_dir: str) -> None:
self.test_type = test_type
self.params = params
self.run_uuid = run_uuid
@@ -42,150 +47,21 @@
self.remote_dir = remote_dir
-class TestResults:
- """
- this class describe test results
-
- config:TestConfig - test config object
- params:dict - parameters from yaml file for this test
- results:{str:MeasurementMesh} - test results object
- raw_result:Any - opaque object to store raw results
- run_interval:(float, float) - test tun time, used for sensors
- """
- def __init__(self,
- config: TestConfig,
- results: Dict[str, Any],
- raw_result: Any,
- run_interval: Tuple[float, float]):
- self.config = config
- self.params = config.params
- self.results = results
- self.raw_result = raw_result
- self.run_interval = run_interval
-
- def __str__(self):
- res = "{0}({1}):\n results:\n".format(
- self.__class__.__name__,
- self.summary())
-
- for name, val in self.results.items():
- res += " {0}={1}\n".format(name, val)
-
- res += " params:\n"
-
- for name, val in self.params.items():
- res += " {0}={1}\n".format(name, val)
-
- return res
-
- @abc.abstractmethod
- def summary(self):
- pass
-
- @abc.abstractmethod
- def get_yamable(self):
- pass
-
-
-# class MeasurementMatrix:
-# """
-# data:[[MeasurementResult]] - VM_COUNT x TH_COUNT matrix of MeasurementResult
-# """
-# def __init__(self, data, connections_ids):
-# self.data = data
-# self.connections_ids = connections_ids
-#
-# def per_vm(self):
-# return self.data
-#
-# def per_th(self):
-# return sum(self.data, [])
-
-
-class MeasurementResults:
- def stat(self):
- return data_property(self.data)
-
- def __str__(self):
- return 'TS([' + ", ".join(map(str, self.data)) + '])'
-
-
-class SimpleVals(MeasurementResults):
- """
- data:[float] - list of values
- """
- def __init__(self, data):
- self.data = data
-
-
-class TimeSeriesValue(MeasurementResults):
- """
- data:[(float, float, float)] - list of (start_time, lenght, average_value_for_interval)
- odata: original values
- """
- def __init__(self, data: List[Tuple[float, float, float]]):
- assert len(data) > 0
- self.odata = data[:]
- self.data = []
-
- cstart = 0
- for nstart, nval in data:
- self.data.append((cstart, nstart - cstart, nval))
- cstart = nstart
-
- @property
- def values(self) -> List[float]:
- return [val[2] for val in self.data]
-
- def average_interval(self) -> float:
- return float(sum([val[1] for val in self.data])) / len(self.data)
-
- def skip(self, seconds) -> 'TimeSeriesValue':
- nres = []
- for start, ln, val in self.data:
- nstart = start + ln - seconds
- if nstart > 0:
- nres.append([nstart, val])
- return self.__class__(nres)
-
- def derived(self, tdelta) -> 'TimeSeriesValue':
- end = self.data[-1][0] + self.data[-1][1]
- tdelta = float(tdelta)
-
- ln = end / tdelta
-
- if ln - int(ln) > 0:
- ln += 1
-
- res = [[tdelta * i, 0.0] for i in range(int(ln))]
-
- for start, lenght, val in self.data:
- start_idx = int(start / tdelta)
- end_idx = int((start + lenght) / tdelta)
-
- for idx in range(start_idx, end_idx + 1):
- rstart = tdelta * idx
- rend = tdelta * (idx + 1)
-
- intersection_ln = min(rend, start + lenght) - max(start, rstart)
- if intersection_ln > 0:
- try:
- res[idx][1] += val * intersection_ln / tdelta
- except IndexError:
- raise
-
- return self.__class__(res)
+class IterationConfig:
+ name = None # type: str
class PerfTest:
- """
- Very base class for tests
- config:TestConfig - test configuration
- stop_requested:bool - stop for test requested
- """
- def __init__(self, config):
+ """Base class for all tests"""
+ name = None # type: str
+ max_retry = 3
+ retry_time = 30
+
+ def __init__(self, config: TestInputConfig) -> None:
self.config = config
self.stop_requested = False
+ self.nodes = self.config.nodes # type: List[IRPCNode]
+ self.sorted_nodes_ids = sorted(node.info.node_id() for node in self.nodes)
def request_stop(self) -> None:
self.stop_requested = True
@@ -193,13 +69,8 @@
def join_remote(self, path: str) -> str:
return os.path.join(self.config.remote_dir, path)
- @classmethod
@abc.abstractmethod
- def load(cls, path: str):
- pass
-
- @abc.abstractmethod
- def run(self):
+ def run(self, storage: Storage) -> None:
pass
@abc.abstractmethod
@@ -207,69 +78,182 @@
pass
-class ThreadedTest(PerfTest):
- """
- Base class for tests, which spawn separated thread for each node
- """
+RunTestRes = Tuple[RawTestResults, Tuple[int, int]]
- def run(self) -> List[TestResults]:
- barrier = Barrier(len(self.config.nodes))
- th_test_func = functools.partial(self.th_test_func, barrier)
- with ThreadPoolExecutor(len(self.config.nodes)) as pool:
- return list(pool.map(th_test_func, self.config.nodes))
+class ThreadedTest(PerfTest, metaclass=abc.ABCMeta):
+ """Base class for tests, which spawn separated thread for each node"""
+
+ # max allowed time difference between starts and stops of run of the same test on different test nodes
+ # used_max_diff = max((min_run_time * max_rel_time_diff), max_time_diff)
+ max_time_diff = 5
+ max_rel_time_diff = 0.05
+
+ def __init__(self, config: TestInputConfig) -> None:
+ PerfTest.__init__(self, config)
+ self.iterations_configs = [None] # type: List[Optional[IterationConfig]]
@abc.abstractmethod
- def do_test(self, node: INode) -> TestResults:
+ def get_expected_runtime(self, iter_cfg: IterationConfig) -> Optional[int]:
pass
- def th_test_func(self, barrier: Barrier, node: INode) -> TestResults:
- test_name = self.__class__.__name__
- logger.debug("Starting {} test on {}".format(test_name , node))
- logger.debug("Run test preparation on {}".format(node))
- self.pre_run(node)
+ def get_not_done_stages(self, storage: Storage) -> Dict[int, IterationConfig]:
+ start_run_id = max(int(name) for _, name in storage.list('result')) + 1
+ not_in_storage = {} # type: Dict[int, IterationConfig]
+ for run_id, iteration_config in enumerate(self.iterations_configs, start_run_id):
+ info_path = "result/{}/info".format(run_id)
+ if info_path in storage:
+ info = cast(Dict[str, Any], storage[info_path]) # type: Dict[str, Any]
- # wait till all thread became ready
- barrier.wait()
+ assert isinstance(info, dict), \
+ "Broken storage at path {}. Expect test info dict, obtain {!r}".format(info_path, info)
- logger.debug("Run test on {}".format(node))
- try:
- return self.do_test(node)
- except Exception as exc:
- msg = "In test {} for {}".format(test_name, node)
- logger.exception(msg)
- raise StopTestError(msg) from exc
+ info = info.copy()
+ del info['begin_time']
+ del info['end_time']
- def pre_run(self, node: INode) -> None:
+ iter_name = "Unnamed" if iteration_config is None else iteration_config.name
+ expected_config = {
+ 'name': self.name,
+ 'iteration_name': iter_name,
+ 'iteration_config': iteration_config,
+ 'params': self.config.params,
+ 'nodes': self.sorted_nodes_ids
+ }
+
+ assert info == expected_config, \
+ "Test info at path {} is not equal to expected config." + \
+ "Maybe configuration was changed before test was restarted. " + \
+ "Current cfg is {!r}, expected cfg is {!r}".format(info_path, info, expected_config)
+
+ logger.info("Test iteration {} found in storage and will be skipped".format(iter_name))
+ else:
+ not_in_storage[run_id] = iteration_config
+ return not_in_storage
+
+ def run(self, storage: Storage) -> None:
+ not_in_storage = self.get_not_done_stages(storage)
+
+ if not not_in_storage:
+ logger.info("All test iteration in storage already. Skip test")
+ return
+
+ logger.debug("Run test {} on nodes {}.".format(self.name, ",".join(self.sorted_nodes_ids)))
+
+ barrier = Barrier(len(self.nodes))
+
+ logger.debug("Run preparation")
+
+ with ThreadPoolExecutor(len(self.nodes)) as pool:
+ list(pool.map(self.config_node, self.nodes))
+
+ # +5% - is a rough estimation for additional operations
+ run_times = [self.get_expected_runtime(iteration_config) for iteration_config in not_in_storage.values()]
+ if None not in run_times:
+ expected_run_time = int(sum(run_times) * 1.05)
+ exec_time_s = sec_to_str(expected_run_time)
+ now_dt = datetime.datetime.now()
+ end_dt = now_dt + datetime.timedelta(0, expected_run_time)
+ logger.info("Entire test should takes aroud: {} and finished at {:%H:%M:%S}"
+ .format(exec_time_s, end_dt))
+
+ for run_id, iteration_config in sorted(not_in_storage.items()):
+ iter_name = "Unnamed" if iteration_config is None else iteration_config.name
+ logger.info("Run test iteration {} ".format(iter_name))
+
+ results = [] # type: List[RunTestRes]
+ for idx in range(self.max_retry):
+ barrier.wait()
+ try:
+ futures = [pool.submit(self.do_test, node, iteration_config) for node in self.nodes]
+ results = [fut.result() for fut in futures]
+ except (EnvironmentError, agent.RPCError) as exc:
+ if self.max_retry - 1 == idx:
+ raise StopTestError("Fio failed") from exc
+ logger.exception("During fio run")
+ else:
+ if all(results):
+ break
+
+ logger.info("Sleeping %ss and retrying", self.retry_time)
+ time.sleep(self.retry_time)
+
+ start_times = [] # type: List[int]
+ stop_times = [] # type: List[int]
+
+ for (result, (t_start, t_stop)), node in zip(results, self.config.nodes):
+ for metrics_name, data in result.items():
+ path = "result/{}/measurement/{}/{}".format(run_id, node.info.node_id(), metrics_name)
+ storage[path] = data # type: ignore
+ start_times.append(t_start)
+ stop_times.append(t_stop)
+
+ min_start_time = min(start_times)
+ max_start_time = max(start_times)
+ min_stop_time = min(stop_times)
+ max_stop_time = max(stop_times)
+
+ max_allowed_time_diff = int((min_stop_time - max_start_time) * self.max_rel_time_diff)
+ max_allowed_time_diff = max(max_allowed_time_diff, self.max_time_diff)
+
+ if min_start_time + self.max_time_diff < max_allowed_time_diff:
+ logger.warning("Too large difference in {}:{} start time - {}. Max recommended difference is {}"
+ .format(self.name, iter_name, max_start_time - min_start_time, self.max_time_diff))
+
+ if min_stop_time + self.max_time_diff < max_allowed_time_diff:
+ logger.warning("Too large difference in {}:{} stop time - {}. Max recommended difference is {}"
+ .format(self.name, iter_name, max_start_time - min_start_time, self.max_time_diff))
+
+ test_config = {
+ 'name': self.name,
+ 'iteration_name': iter_name,
+ 'iteration_config': iteration_config,
+ 'params': self.config.params,
+ 'nodes': self.sorted_nodes_ids,
+ 'begin_time': min_start_time,
+ 'end_time': max_stop_time
+ }
+
+ storage["result/{}/info".format(run_id)] = test_config # type: ignore
+
+ @abc.abstractmethod
+ def config_node(self, node: IRPCNode) -> None:
+ pass
+
+ @abc.abstractmethod
+ def do_test(self, node: IRPCNode, iter_config: IterationConfig) -> RunTestRes:
pass
-class TwoScriptTest(ThreadedTest):
- def __init__(self, *dt, **mp):
+class TwoScriptTest(ThreadedTest, metaclass=abc.ABCMeta):
+ def __init__(self, *dt, **mp) -> None:
ThreadedTest.__init__(self, *dt, **mp)
- self.remote_dir = '/tmp'
self.prerun_script = self.config.params['prerun_script']
self.run_script = self.config.params['run_script']
-
self.prerun_tout = self.config.params.get('prerun_tout', 3600)
self.run_tout = self.config.params.get('run_tout', 3600)
+ self.iterations_configs = [None]
- def get_remote_for_script(self, script: str) -> str:
- return os.path.join(self.remote_dir, os.path.basename(script))
+ def get_expected_runtime(self, iter_cfg: IterationConfig) -> Optional[int]:
+ return None
- def pre_run(self, node: INode) -> None:
- copy_paths(node.connection,
- {self.run_script: self.get_remote_for_script(self.run_script),
- self.prerun_script: self.get_remote_for_script(self.prerun_script)})
+ def config_node(self, node: IRPCNode) -> None:
+ node.copy_file(self.run_script, self.join_remote(self.run_script))
+ node.copy_file(self.prerun_script, self.join_remote(self.prerun_script))
- cmd = self.get_remote_for_script(self.prerun_script)
+ cmd = self.join_remote(self.prerun_script)
cmd += ' ' + self.config.params.get('prerun_opts', '')
node.run(cmd, timeout=self.prerun_tout)
- def do_test(self, node: INode) -> TestResults:
- cmd = self.get_remote_for_script(self.run_script)
+ def do_test(self, node: IRPCNode, iter_config: IterationConfig) -> RunTestRes:
+ cmd = self.join_remote(self.run_script)
cmd += ' ' + self.config.params.get('run_opts', '')
t1 = time.time()
- res = node.run(cmd, timeout=self.run_tout)
+ res = self.parse_results(node.run(cmd, timeout=self.run_tout))
t2 = time.time()
- return TestResults(self.config, None, res, (t1, t2))
+ return res, (int(t1), int(t2))
+
+ @abc.abstractmethod
+ def parse_results(self, data: str) -> RawTestResults:
+ pass
+