blob: f328e13ec1cdc288b1e5ecd428e9ee24b89d3167 [file] [log] [blame]
koder aka kdanilov4643fd62015-02-10 16:20:13 -08001import abc
koder aka kdanilovbc2c8982015-06-13 02:50:43 +03002import time
3import logging
koder aka kdanilov4643fd62015-02-10 16:20:13 -08004import os.path
koder aka kdanilov70227062016-11-26 23:23:21 +02005import datetime
6from typing import Dict, Any, List, Optional, Tuple, cast
koder aka kdanilov652cd802015-04-13 12:21:07 +03007
koder aka kdanilovbc2c8982015-06-13 02:50:43 +03008from concurrent.futures import ThreadPoolExecutor
koder aka kdanilov4643fd62015-02-10 16:20:13 -08009
koder aka kdanilov70227062016-11-26 23:23:21 +020010from ..utils import Barrier, StopTestError, sec_to_str
11from ..node_interfaces import IRPCNode
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020012from ..storage import Storage
koder aka kdanilov70227062016-11-26 23:23:21 +020013from ..result_classes import RawTestResults
14
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030015
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030016logger = logging.getLogger("wally")
koder aka kdanilov88407ff2015-05-26 15:35:57 +030017
18
koder aka kdanilov70227062016-11-26 23:23:21 +020019__doc__ = "Contains base classes for performance tests"
20
21
22class TestInputConfig:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030023 """
24 this class describe test input configuration
koder aka kdanilov88407ff2015-05-26 15:35:57 +030025
koder aka kdanilov70227062016-11-26 23:23:21 +020026 test_type - test type name
27 params - parameters from yaml file for this test
28 test_uuid - UUID to be used to create file names & Co
29 log_directory - local directory to store results
30 nodes - nodes to run tests on
31 remote_dir - directory on nodes to be used for local files
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030032 """
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030033 def __init__(self,
34 test_type: str,
35 params: Dict[str, Any],
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020036 run_uuid: str,
koder aka kdanilov70227062016-11-26 23:23:21 +020037 nodes: List[IRPCNode],
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020038 storage: Storage,
koder aka kdanilov70227062016-11-26 23:23:21 +020039 remote_dir: str) -> None:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030040 self.test_type = test_type
41 self.params = params
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020042 self.run_uuid = run_uuid
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030043 self.nodes = nodes
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020044 self.storage = storage
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030045 self.remote_dir = remote_dir
koder aka kdanilov88407ff2015-05-26 15:35:57 +030046
47
koder aka kdanilov70227062016-11-26 23:23:21 +020048class IterationConfig:
49 name = None # type: str
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030050
51
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030052class PerfTest:
koder aka kdanilov70227062016-11-26 23:23:21 +020053 """Base class for all tests"""
54 name = None # type: str
55 max_retry = 3
56 retry_time = 30
57
58 def __init__(self, config: TestInputConfig) -> None:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030059 self.config = config
koder aka kdanilove2de58c2015-04-24 22:59:36 +030060 self.stop_requested = False
koder aka kdanilov70227062016-11-26 23:23:21 +020061 self.nodes = self.config.nodes # type: List[IRPCNode]
62 self.sorted_nodes_ids = sorted(node.info.node_id() for node in self.nodes)
koder aka kdanilove2de58c2015-04-24 22:59:36 +030063
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030064 def request_stop(self) -> None:
koder aka kdanilove2de58c2015-04-24 22:59:36 +030065 self.stop_requested = True
koder aka kdanilov2066daf2015-04-23 21:05:41 +030066
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030067 def join_remote(self, path: str) -> str:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030068 return os.path.join(self.config.remote_dir, path)
koder aka kdanilov4500a5f2015-04-17 16:55:17 +030069
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030070 @abc.abstractmethod
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +020071 def run(self) -> None:
koder aka kdanilov4643fd62015-02-10 16:20:13 -080072 pass
73
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030074 @abc.abstractmethod
koder aka kdanilov39e449e2016-12-17 15:15:26 +020075 def format_for_console(self, data: Any) -> str:
koder aka kdanilovec1b9732015-04-23 20:43:29 +030076 pass
77
koder aka kdanilov4643fd62015-02-10 16:20:13 -080078
koder aka kdanilov70227062016-11-26 23:23:21 +020079RunTestRes = Tuple[RawTestResults, Tuple[int, int]]
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030080
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030081
koder aka kdanilov70227062016-11-26 23:23:21 +020082class ThreadedTest(PerfTest, metaclass=abc.ABCMeta):
83 """Base class for tests, which spawn separated thread for each node"""
84
85 # max allowed time difference between starts and stops of run of the same test on different test nodes
86 # used_max_diff = max((min_run_time * max_rel_time_diff), max_time_diff)
87 max_time_diff = 5
88 max_rel_time_diff = 0.05
89
90 def __init__(self, config: TestInputConfig) -> None:
91 PerfTest.__init__(self, config)
92 self.iterations_configs = [None] # type: List[Optional[IterationConfig]]
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030093
94 @abc.abstractmethod
koder aka kdanilov70227062016-11-26 23:23:21 +020095 def get_expected_runtime(self, iter_cfg: IterationConfig) -> Optional[int]:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030096 pass
97
koder aka kdanilov70227062016-11-26 23:23:21 +020098 def get_not_done_stages(self, storage: Storage) -> Dict[int, IterationConfig]:
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +020099 done_stages = list(storage.list('result'))
100 if len(done_stages) == 0:
101 start_run_id = 0
102 else:
103 start_run_id = max(int(name) for _, name in done_stages) + 1
104
koder aka kdanilov70227062016-11-26 23:23:21 +0200105 not_in_storage = {} # type: Dict[int, IterationConfig]
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +0200106
107 for run_id, iteration_config in enumerate(self.iterations_configs[start_run_id:], start_run_id):
koder aka kdanilov70227062016-11-26 23:23:21 +0200108 info_path = "result/{}/info".format(run_id)
109 if info_path in storage:
110 info = cast(Dict[str, Any], storage[info_path]) # type: Dict[str, Any]
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300111
koder aka kdanilov70227062016-11-26 23:23:21 +0200112 assert isinstance(info, dict), \
113 "Broken storage at path {}. Expect test info dict, obtain {!r}".format(info_path, info)
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300114
koder aka kdanilov70227062016-11-26 23:23:21 +0200115 info = info.copy()
116 del info['begin_time']
117 del info['end_time']
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300118
koder aka kdanilov70227062016-11-26 23:23:21 +0200119 iter_name = "Unnamed" if iteration_config is None else iteration_config.name
120 expected_config = {
121 'name': self.name,
122 'iteration_name': iter_name,
123 'iteration_config': iteration_config,
124 'params': self.config.params,
125 'nodes': self.sorted_nodes_ids
126 }
127
128 assert info == expected_config, \
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200129 ("Test info at path {} is not equal to expected config." +
130 "Maybe configuration was changed before test was restarted. " +
131 "Current cfg is {!r}, expected cfg is {!r}").format(info_path, info, expected_config)
koder aka kdanilov70227062016-11-26 23:23:21 +0200132
133 logger.info("Test iteration {} found in storage and will be skipped".format(iter_name))
134 else:
135 not_in_storage[run_id] = iteration_config
136 return not_in_storage
137
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +0200138 def run(self) -> None:
139 not_in_storage = self.get_not_done_stages(self.config.storage)
koder aka kdanilov70227062016-11-26 23:23:21 +0200140
141 if not not_in_storage:
142 logger.info("All test iteration in storage already. Skip test")
143 return
144
145 logger.debug("Run test {} on nodes {}.".format(self.name, ",".join(self.sorted_nodes_ids)))
146
147 barrier = Barrier(len(self.nodes))
148
149 logger.debug("Run preparation")
150
151 with ThreadPoolExecutor(len(self.nodes)) as pool:
152 list(pool.map(self.config_node, self.nodes))
153
154 # +5% - is a rough estimation for additional operations
155 run_times = [self.get_expected_runtime(iteration_config) for iteration_config in not_in_storage.values()]
156 if None not in run_times:
157 expected_run_time = int(sum(run_times) * 1.05)
158 exec_time_s = sec_to_str(expected_run_time)
159 now_dt = datetime.datetime.now()
160 end_dt = now_dt + datetime.timedelta(0, expected_run_time)
161 logger.info("Entire test should takes aroud: {} and finished at {:%H:%M:%S}"
162 .format(exec_time_s, end_dt))
163
164 for run_id, iteration_config in sorted(not_in_storage.items()):
165 iter_name = "Unnamed" if iteration_config is None else iteration_config.name
166 logger.info("Run test iteration {} ".format(iter_name))
167
168 results = [] # type: List[RunTestRes]
169 for idx in range(self.max_retry):
170 barrier.wait()
171 try:
172 futures = [pool.submit(self.do_test, node, iteration_config) for node in self.nodes]
173 results = [fut.result() for fut in futures]
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200174 except EnvironmentError as exc:
koder aka kdanilov70227062016-11-26 23:23:21 +0200175 if self.max_retry - 1 == idx:
176 raise StopTestError("Fio failed") from exc
177 logger.exception("During fio run")
koder aka kdanilov70227062016-11-26 23:23:21 +0200178
179 logger.info("Sleeping %ss and retrying", self.retry_time)
180 time.sleep(self.retry_time)
181
182 start_times = [] # type: List[int]
183 stop_times = [] # type: List[int]
184
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +0200185 mstorage = self.config.storage.sub_storage("result", str(run_id), "measurement")
koder aka kdanilov70227062016-11-26 23:23:21 +0200186 for (result, (t_start, t_stop)), node in zip(results, self.config.nodes):
187 for metrics_name, data in result.items():
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200188 mstorage[node.info.node_id(), metrics_name] = data # type: ignore
koder aka kdanilov70227062016-11-26 23:23:21 +0200189 start_times.append(t_start)
190 stop_times.append(t_stop)
191
192 min_start_time = min(start_times)
193 max_start_time = max(start_times)
194 min_stop_time = min(stop_times)
195 max_stop_time = max(stop_times)
196
197 max_allowed_time_diff = int((min_stop_time - max_start_time) * self.max_rel_time_diff)
198 max_allowed_time_diff = max(max_allowed_time_diff, self.max_time_diff)
199
200 if min_start_time + self.max_time_diff < max_allowed_time_diff:
201 logger.warning("Too large difference in {}:{} start time - {}. Max recommended difference is {}"
202 .format(self.name, iter_name, max_start_time - min_start_time, self.max_time_diff))
203
204 if min_stop_time + self.max_time_diff < max_allowed_time_diff:
205 logger.warning("Too large difference in {}:{} stop time - {}. Max recommended difference is {}"
206 .format(self.name, iter_name, max_start_time - min_start_time, self.max_time_diff))
207
208 test_config = {
209 'name': self.name,
210 'iteration_name': iter_name,
211 'iteration_config': iteration_config,
212 'params': self.config.params,
213 'nodes': self.sorted_nodes_ids,
214 'begin_time': min_start_time,
215 'end_time': max_stop_time
216 }
217
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +0200218 self.config.storage["result", str(run_id), "info"] = test_config # type: ignore
koder aka kdanilov70227062016-11-26 23:23:21 +0200219
220 @abc.abstractmethod
221 def config_node(self, node: IRPCNode) -> None:
222 pass
223
224 @abc.abstractmethod
225 def do_test(self, node: IRPCNode, iter_config: IterationConfig) -> RunTestRes:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300226 pass
227
228
koder aka kdanilov70227062016-11-26 23:23:21 +0200229class TwoScriptTest(ThreadedTest, metaclass=abc.ABCMeta):
230 def __init__(self, *dt, **mp) -> None:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300231 ThreadedTest.__init__(self, *dt, **mp)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300232 self.prerun_script = self.config.params['prerun_script']
233 self.run_script = self.config.params['run_script']
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300234 self.prerun_tout = self.config.params.get('prerun_tout', 3600)
235 self.run_tout = self.config.params.get('run_tout', 3600)
koder aka kdanilov70227062016-11-26 23:23:21 +0200236 self.iterations_configs = [None]
Yulia Portnova7ddfa732015-02-24 17:32:58 +0200237
koder aka kdanilov70227062016-11-26 23:23:21 +0200238 def get_expected_runtime(self, iter_cfg: IterationConfig) -> Optional[int]:
239 return None
Yulia Portnova7ddfa732015-02-24 17:32:58 +0200240
koder aka kdanilov70227062016-11-26 23:23:21 +0200241 def config_node(self, node: IRPCNode) -> None:
242 node.copy_file(self.run_script, self.join_remote(self.run_script))
243 node.copy_file(self.prerun_script, self.join_remote(self.prerun_script))
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300244
koder aka kdanilov70227062016-11-26 23:23:21 +0200245 cmd = self.join_remote(self.prerun_script)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300246 cmd += ' ' + self.config.params.get('prerun_opts', '')
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300247 node.run(cmd, timeout=self.prerun_tout)
Yulia Portnova7ddfa732015-02-24 17:32:58 +0200248
koder aka kdanilov70227062016-11-26 23:23:21 +0200249 def do_test(self, node: IRPCNode, iter_config: IterationConfig) -> RunTestRes:
250 cmd = self.join_remote(self.run_script)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300251 cmd += ' ' + self.config.params.get('run_opts', '')
252 t1 = time.time()
koder aka kdanilov70227062016-11-26 23:23:21 +0200253 res = self.parse_results(node.run(cmd, timeout=self.run_tout))
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300254 t2 = time.time()
koder aka kdanilov70227062016-11-26 23:23:21 +0200255 return res, (int(t1), int(t2))
256
257 @abc.abstractmethod
258 def parse_results(self, data: str) -> RawTestResults:
259 pass
260