blob: 78986f685fde7e59efccc343533b686403cbe4c2 [file] [log] [blame]
koder aka kdanilov4643fd62015-02-10 16:20:13 -08001import abc
koder aka kdanilovbc2c8982015-06-13 02:50:43 +03002import time
3import logging
koder aka kdanilov4643fd62015-02-10 16:20:13 -08004import os.path
koder aka kdanilov70227062016-11-26 23:23:21 +02005import datetime
koder aka kdanilov7f59d562016-12-26 01:34:23 +02006from typing import Dict, Any, List, Optional, Callable, cast
koder aka kdanilov652cd802015-04-13 12:21:07 +03007
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +02008from concurrent.futures import ThreadPoolExecutor, wait
koder aka kdanilov4643fd62015-02-10 16:20:13 -08009
koder aka kdanilov70227062016-11-26 23:23:21 +020010from ..utils import Barrier, StopTestError, sec_to_str
11from ..node_interfaces import IRPCNode
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020012from ..storage import Storage
koder aka kdanilov7f59d562016-12-26 01:34:23 +020013from ..result_classes import NodeTestResults, IStorable
14from queue import Queue
koder aka kdanilov70227062016-11-26 23:23:21 +020015
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030016
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030017logger = logging.getLogger("wally")
koder aka kdanilov88407ff2015-05-26 15:35:57 +030018
19
koder aka kdanilov70227062016-11-26 23:23:21 +020020__doc__ = "Contains base classes for performance tests"
21
22
23class TestInputConfig:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030024 """
25 this class describe test input configuration
koder aka kdanilov88407ff2015-05-26 15:35:57 +030026
koder aka kdanilov70227062016-11-26 23:23:21 +020027 test_type - test type name
28 params - parameters from yaml file for this test
29 test_uuid - UUID to be used to create file names & Co
30 log_directory - local directory to store results
31 nodes - nodes to run tests on
32 remote_dir - directory on nodes to be used for local files
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030033 """
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030034 def __init__(self,
35 test_type: str,
36 params: Dict[str, Any],
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020037 run_uuid: str,
koder aka kdanilov70227062016-11-26 23:23:21 +020038 nodes: List[IRPCNode],
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020039 storage: Storage,
koder aka kdanilov70227062016-11-26 23:23:21 +020040 remote_dir: str) -> None:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030041 self.test_type = test_type
42 self.params = params
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020043 self.run_uuid = run_uuid
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030044 self.nodes = nodes
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020045 self.storage = storage
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030046 self.remote_dir = remote_dir
koder aka kdanilov88407ff2015-05-26 15:35:57 +030047
48
koder aka kdanilov7f59d562016-12-26 01:34:23 +020049class IterationConfig(IStorable):
koder aka kdanilov70227062016-11-26 23:23:21 +020050 name = None # type: str
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +020051 summary = None # type: str
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030052
53
koder aka kdanilov7f59d562016-12-26 01:34:23 +020054class PerfTest(metaclass=abc.ABCMeta):
koder aka kdanilov70227062016-11-26 23:23:21 +020055 """Base class for all tests"""
56 name = None # type: str
57 max_retry = 3
58 retry_time = 30
59
koder aka kdanilov7f59d562016-12-26 01:34:23 +020060 def __init__(self, config: TestInputConfig, on_idle: Callable[[], None] = None) -> None:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030061 self.config = config
koder aka kdanilove2de58c2015-04-24 22:59:36 +030062 self.stop_requested = False
koder aka kdanilov70227062016-11-26 23:23:21 +020063 self.nodes = self.config.nodes # type: List[IRPCNode]
64 self.sorted_nodes_ids = sorted(node.info.node_id() for node in self.nodes)
koder aka kdanilov7f59d562016-12-26 01:34:23 +020065 self.on_idle = on_idle
koder aka kdanilove2de58c2015-04-24 22:59:36 +030066
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030067 def request_stop(self) -> None:
koder aka kdanilove2de58c2015-04-24 22:59:36 +030068 self.stop_requested = True
koder aka kdanilov2066daf2015-04-23 21:05:41 +030069
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030070 def join_remote(self, path: str) -> str:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030071 return os.path.join(self.config.remote_dir, path)
koder aka kdanilov4500a5f2015-04-17 16:55:17 +030072
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030073 @abc.abstractmethod
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +020074 def run(self) -> None:
koder aka kdanilov4643fd62015-02-10 16:20:13 -080075 pass
76
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030077 @abc.abstractmethod
koder aka kdanilov39e449e2016-12-17 15:15:26 +020078 def format_for_console(self, data: Any) -> str:
koder aka kdanilovec1b9732015-04-23 20:43:29 +030079 pass
80
koder aka kdanilov4643fd62015-02-10 16:20:13 -080081
koder aka kdanilov70227062016-11-26 23:23:21 +020082class ThreadedTest(PerfTest, metaclass=abc.ABCMeta):
83 """Base class for tests, which spawn separated thread for each node"""
84
85 # max allowed time difference between starts and stops of run of the same test on different test nodes
86 # used_max_diff = max((min_run_time * max_rel_time_diff), max_time_diff)
87 max_time_diff = 5
88 max_rel_time_diff = 0.05
89
koder aka kdanilov7f59d562016-12-26 01:34:23 +020090 def __init__(self, *args, **kwargs) -> None:
91 PerfTest.__init__(self, *args, **kwargs)
koder aka kdanilov70227062016-11-26 23:23:21 +020092 self.iterations_configs = [None] # type: List[Optional[IterationConfig]]
koder aka kdanilov7f59d562016-12-26 01:34:23 +020093 self.storage_q = Queue() # type: Any
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030094
95 @abc.abstractmethod
koder aka kdanilov70227062016-11-26 23:23:21 +020096 def get_expected_runtime(self, iter_cfg: IterationConfig) -> Optional[int]:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030097 pass
98
koder aka kdanilov70227062016-11-26 23:23:21 +020099 def get_not_done_stages(self, storage: Storage) -> Dict[int, IterationConfig]:
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200100 not_done = {} # type: Dict[int, IterationConfig]
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +0200101
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200102 for run_id, iteration_config in enumerate(self.iterations_configs):
koder aka kdanilov70227062016-11-26 23:23:21 +0200103 info_path = "result/{}/info".format(run_id)
104 if info_path in storage:
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200105 info = cast(Dict[str, Any], storage.get(info_path)) # type: Dict[str, Any]
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300106
koder aka kdanilov70227062016-11-26 23:23:21 +0200107 assert isinstance(info, dict), \
108 "Broken storage at path {}. Expect test info dict, obtain {!r}".format(info_path, info)
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300109
koder aka kdanilov70227062016-11-26 23:23:21 +0200110 info = info.copy()
111 del info['begin_time']
112 del info['end_time']
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300113
koder aka kdanilov70227062016-11-26 23:23:21 +0200114 iter_name = "Unnamed" if iteration_config is None else iteration_config.name
115 expected_config = {
116 'name': self.name,
117 'iteration_name': iter_name,
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200118 'iteration_config': iteration_config.raw(),
koder aka kdanilov70227062016-11-26 23:23:21 +0200119 'params': self.config.params,
120 'nodes': self.sorted_nodes_ids
121 }
122
123 assert info == expected_config, \
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200124 ("Test info at path {} is not equal to expected config." +
125 "Maybe configuration was changed before test was restarted. " +
126 "Current cfg is {!r}, expected cfg is {!r}").format(info_path, info, expected_config)
koder aka kdanilov70227062016-11-26 23:23:21 +0200127
128 logger.info("Test iteration {} found in storage and will be skipped".format(iter_name))
129 else:
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200130 not_done[run_id] = iteration_config
131
132 return not_done
koder aka kdanilov70227062016-11-26 23:23:21 +0200133
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +0200134 def run(self) -> None:
135 not_in_storage = self.get_not_done_stages(self.config.storage)
koder aka kdanilov70227062016-11-26 23:23:21 +0200136
137 if not not_in_storage:
138 logger.info("All test iteration in storage already. Skip test")
139 return
140
141 logger.debug("Run test {} on nodes {}.".format(self.name, ",".join(self.sorted_nodes_ids)))
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200142 logger.debug("Prepare nodes")
koder aka kdanilov70227062016-11-26 23:23:21 +0200143
144 with ThreadPoolExecutor(len(self.nodes)) as pool:
145 list(pool.map(self.config_node, self.nodes))
146
147 # +5% - is a rough estimation for additional operations
148 run_times = [self.get_expected_runtime(iteration_config) for iteration_config in not_in_storage.values()]
149 if None not in run_times:
150 expected_run_time = int(sum(run_times) * 1.05)
151 exec_time_s = sec_to_str(expected_run_time)
152 now_dt = datetime.datetime.now()
153 end_dt = now_dt + datetime.timedelta(0, expected_run_time)
154 logger.info("Entire test should takes aroud: {} and finished at {:%H:%M:%S}"
155 .format(exec_time_s, end_dt))
156
157 for run_id, iteration_config in sorted(not_in_storage.items()):
158 iter_name = "Unnamed" if iteration_config is None else iteration_config.name
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200159 logger.info("Run test iteration %s", iter_name)
koder aka kdanilov70227062016-11-26 23:23:21 +0200160
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200161 current_result_path = "result/{}_{}".format(iteration_config.summary, run_id)
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200162 results = [] # type: List[NodeTestResults]
koder aka kdanilov70227062016-11-26 23:23:21 +0200163 for idx in range(self.max_retry):
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200164 logger.debug("Prepare iteration %s", iter_name)
koder aka kdanilov70227062016-11-26 23:23:21 +0200165
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200166 # prepare nodes for new iterations
167 futures = [pool.submit(self.prepare_iteration, node, iteration_config) for node in self.nodes]
168 wait(futures)
169
170 # run iteration
171 logger.debug("Run iteration %s", iter_name)
172 try:
173 futures = []
174 for node in self.nodes:
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200175 path = "{}/measurement/{}".format(current_result_path, node.info.node_id())
176 futures.append(pool.submit(self.run_iteration, node, iteration_config, path))
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200177
178 results = [fut.result() for fut in futures]
179 break
180 except EnvironmentError:
181 if self.max_retry - 1 == idx:
182 logger.exception("Fio failed")
183 raise StopTestError()
184 logger.exception("During fio run")
185 logger.info("Sleeping %ss and retrying", self.retry_time)
186 time.sleep(self.retry_time)
koder aka kdanilov70227062016-11-26 23:23:21 +0200187
188 start_times = [] # type: List[int]
189 stop_times = [] # type: List[int]
190
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200191 # TODO: FIX result processing - NodeTestResults
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200192 for result in results:
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200193 for name, serie in result.series.items():
194 start_times.append(serie.start_at)
195 stop_times.append(serie.step * len(serie.data))
koder aka kdanilov70227062016-11-26 23:23:21 +0200196
197 min_start_time = min(start_times)
198 max_start_time = max(start_times)
199 min_stop_time = min(stop_times)
200 max_stop_time = max(stop_times)
201
202 max_allowed_time_diff = int((min_stop_time - max_start_time) * self.max_rel_time_diff)
203 max_allowed_time_diff = max(max_allowed_time_diff, self.max_time_diff)
204
205 if min_start_time + self.max_time_diff < max_allowed_time_diff:
206 logger.warning("Too large difference in {}:{} start time - {}. Max recommended difference is {}"
207 .format(self.name, iter_name, max_start_time - min_start_time, self.max_time_diff))
208
209 if min_stop_time + self.max_time_diff < max_allowed_time_diff:
210 logger.warning("Too large difference in {}:{} stop time - {}. Max recommended difference is {}"
211 .format(self.name, iter_name, max_start_time - min_start_time, self.max_time_diff))
212
213 test_config = {
214 'name': self.name,
215 'iteration_name': iter_name,
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200216 'iteration_config': iteration_config.raw(),
koder aka kdanilov70227062016-11-26 23:23:21 +0200217 'params': self.config.params,
218 'nodes': self.sorted_nodes_ids,
219 'begin_time': min_start_time,
220 'end_time': max_stop_time
221 }
222
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200223 self.process_storage_queue()
224 self.config.storage.put(test_config, "result", str(run_id), "info")
225
226 if "all_results" in self.config.storage:
227 all_results = self.config.storage.get("all_results")
228 else:
229 all_results = []
230
231 all_results.append([self.name, iteration_config.summary, current_result_path])
232 self.config.storage.put(all_results, "all_results")
233 self.config.storage.sync()
234
235 if self.on_idle is not None:
236 self.on_idle()
237
238 def store_data(self, val: Any, type: str, prefix: str, *path: str) -> None:
239 self.storage_q.put((val, type, prefix, path))
240
241 def process_storage_queue(self) -> None:
242 while not self.storage_q.empty():
243 value, val_type, subpath, val_path = self.storage_q.get()
244 if val_type == 'raw':
245 self.config.storage.put_raw(value, subpath, *val_path)
246 elif val_type == 'yaml':
247 self.config.storage.put(value, subpath, *val_path)
248 elif val_type == 'array':
249 self.config.storage.put_array(value, subpath, *val_path)
250 else:
251 logger.error("Internal logic error - unknown data stop type {!r}".format(val_path))
252 raise StopTestError()
koder aka kdanilov70227062016-11-26 23:23:21 +0200253
254 @abc.abstractmethod
255 def config_node(self, node: IRPCNode) -> None:
256 pass
257
258 @abc.abstractmethod
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200259 def prepare_iteration(self, node: IRPCNode, iter_config: IterationConfig) -> None:
260 pass
261
262 @abc.abstractmethod
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200263 def run_iteration(self, node: IRPCNode, iter_config: IterationConfig, stor_prefix: str) -> NodeTestResults:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300264 pass
265
266
koder aka kdanilov70227062016-11-26 23:23:21 +0200267class TwoScriptTest(ThreadedTest, metaclass=abc.ABCMeta):
268 def __init__(self, *dt, **mp) -> None:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300269 ThreadedTest.__init__(self, *dt, **mp)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300270 self.prerun_script = self.config.params['prerun_script']
271 self.run_script = self.config.params['run_script']
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300272 self.prerun_tout = self.config.params.get('prerun_tout', 3600)
273 self.run_tout = self.config.params.get('run_tout', 3600)
koder aka kdanilov70227062016-11-26 23:23:21 +0200274 self.iterations_configs = [None]
Yulia Portnova7ddfa732015-02-24 17:32:58 +0200275
koder aka kdanilov70227062016-11-26 23:23:21 +0200276 def get_expected_runtime(self, iter_cfg: IterationConfig) -> Optional[int]:
277 return None
Yulia Portnova7ddfa732015-02-24 17:32:58 +0200278
koder aka kdanilov70227062016-11-26 23:23:21 +0200279 def config_node(self, node: IRPCNode) -> None:
280 node.copy_file(self.run_script, self.join_remote(self.run_script))
281 node.copy_file(self.prerun_script, self.join_remote(self.prerun_script))
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300282
koder aka kdanilov70227062016-11-26 23:23:21 +0200283 cmd = self.join_remote(self.prerun_script)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300284 cmd += ' ' + self.config.params.get('prerun_opts', '')
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300285 node.run(cmd, timeout=self.prerun_tout)
Yulia Portnova7ddfa732015-02-24 17:32:58 +0200286
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200287 def prepare_iteration(self, node: IRPCNode, iter_config: IterationConfig) -> None:
288 pass
289
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200290 def run_iteration(self, node: IRPCNode, iter_config: IterationConfig, stor_prefix: str) -> NodeTestResults:
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200291 # TODO: have to store logs
koder aka kdanilov70227062016-11-26 23:23:21 +0200292 cmd = self.join_remote(self.run_script)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300293 cmd += ' ' + self.config.params.get('run_opts', '')
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200294 return self.parse_results(node.run(cmd, timeout=self.run_tout))
koder aka kdanilov70227062016-11-26 23:23:21 +0200295
296 @abc.abstractmethod
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200297 def parse_results(self, data: str) -> NodeTestResults:
koder aka kdanilov70227062016-11-26 23:23:21 +0200298 pass
299