blob: 446ad696c7a04ae60eddd9169cec7eac6613b3ae [file] [log] [blame]
koder aka kdanilov4643fd62015-02-10 16:20:13 -08001import abc
koder aka kdanilovbc2c8982015-06-13 02:50:43 +03002import time
3import logging
koder aka kdanilov4643fd62015-02-10 16:20:13 -08004import os.path
koder aka kdanilov70227062016-11-26 23:23:21 +02005import datetime
koder aka kdanilov7f59d562016-12-26 01:34:23 +02006from typing import Dict, Any, List, Optional, Callable, cast
koder aka kdanilov652cd802015-04-13 12:21:07 +03007
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +02008from concurrent.futures import ThreadPoolExecutor, wait
koder aka kdanilov4643fd62015-02-10 16:20:13 -08009
koder aka kdanilov70227062016-11-26 23:23:21 +020010from ..utils import Barrier, StopTestError, sec_to_str
11from ..node_interfaces import IRPCNode
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020012from ..storage import Storage
koder aka kdanilov7f59d562016-12-26 01:34:23 +020013from ..result_classes import NodeTestResults, IStorable
14from queue import Queue
koder aka kdanilov70227062016-11-26 23:23:21 +020015
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030016
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030017logger = logging.getLogger("wally")
koder aka kdanilov88407ff2015-05-26 15:35:57 +030018
19
koder aka kdanilov70227062016-11-26 23:23:21 +020020__doc__ = "Contains base classes for performance tests"
21
22
23class TestInputConfig:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030024 """
25 this class describe test input configuration
koder aka kdanilov88407ff2015-05-26 15:35:57 +030026
koder aka kdanilov70227062016-11-26 23:23:21 +020027 test_type - test type name
28 params - parameters from yaml file for this test
29 test_uuid - UUID to be used to create file names & Co
30 log_directory - local directory to store results
31 nodes - nodes to run tests on
32 remote_dir - directory on nodes to be used for local files
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030033 """
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030034 def __init__(self,
35 test_type: str,
36 params: Dict[str, Any],
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020037 run_uuid: str,
koder aka kdanilov70227062016-11-26 23:23:21 +020038 nodes: List[IRPCNode],
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020039 storage: Storage,
koder aka kdanilov70227062016-11-26 23:23:21 +020040 remote_dir: str) -> None:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030041 self.test_type = test_type
42 self.params = params
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020043 self.run_uuid = run_uuid
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030044 self.nodes = nodes
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020045 self.storage = storage
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030046 self.remote_dir = remote_dir
koder aka kdanilov88407ff2015-05-26 15:35:57 +030047
48
koder aka kdanilov7f59d562016-12-26 01:34:23 +020049class IterationConfig(IStorable):
koder aka kdanilov70227062016-11-26 23:23:21 +020050 name = None # type: str
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +020051 summary = None # type: str
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030052
53
koder aka kdanilov7f59d562016-12-26 01:34:23 +020054class PerfTest(metaclass=abc.ABCMeta):
koder aka kdanilov70227062016-11-26 23:23:21 +020055 """Base class for all tests"""
56 name = None # type: str
57 max_retry = 3
58 retry_time = 30
59
koder aka kdanilov7f59d562016-12-26 01:34:23 +020060 def __init__(self, config: TestInputConfig, on_idle: Callable[[], None] = None) -> None:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030061 self.config = config
koder aka kdanilove2de58c2015-04-24 22:59:36 +030062 self.stop_requested = False
koder aka kdanilov70227062016-11-26 23:23:21 +020063 self.nodes = self.config.nodes # type: List[IRPCNode]
64 self.sorted_nodes_ids = sorted(node.info.node_id() for node in self.nodes)
koder aka kdanilov7f59d562016-12-26 01:34:23 +020065 self.on_idle = on_idle
koder aka kdanilove2de58c2015-04-24 22:59:36 +030066
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030067 def request_stop(self) -> None:
koder aka kdanilove2de58c2015-04-24 22:59:36 +030068 self.stop_requested = True
koder aka kdanilov2066daf2015-04-23 21:05:41 +030069
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030070 def join_remote(self, path: str) -> str:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030071 return os.path.join(self.config.remote_dir, path)
koder aka kdanilov4500a5f2015-04-17 16:55:17 +030072
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030073 @abc.abstractmethod
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +020074 def run(self) -> None:
koder aka kdanilov4643fd62015-02-10 16:20:13 -080075 pass
76
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030077 @abc.abstractmethod
koder aka kdanilov39e449e2016-12-17 15:15:26 +020078 def format_for_console(self, data: Any) -> str:
koder aka kdanilovec1b9732015-04-23 20:43:29 +030079 pass
80
koder aka kdanilov4643fd62015-02-10 16:20:13 -080081
koder aka kdanilov70227062016-11-26 23:23:21 +020082class ThreadedTest(PerfTest, metaclass=abc.ABCMeta):
83 """Base class for tests, which spawn separated thread for each node"""
84
85 # max allowed time difference between starts and stops of run of the same test on different test nodes
86 # used_max_diff = max((min_run_time * max_rel_time_diff), max_time_diff)
87 max_time_diff = 5
88 max_rel_time_diff = 0.05
koder aka kdanilovffaf48d2016-12-27 02:25:29 +020089 load_profile_name = None # type: str
koder aka kdanilov70227062016-11-26 23:23:21 +020090
koder aka kdanilov7f59d562016-12-26 01:34:23 +020091 def __init__(self, *args, **kwargs) -> None:
92 PerfTest.__init__(self, *args, **kwargs)
koder aka kdanilov70227062016-11-26 23:23:21 +020093 self.iterations_configs = [None] # type: List[Optional[IterationConfig]]
koder aka kdanilov7f59d562016-12-26 01:34:23 +020094 self.storage_q = Queue() # type: Any
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030095
96 @abc.abstractmethod
koder aka kdanilov70227062016-11-26 23:23:21 +020097 def get_expected_runtime(self, iter_cfg: IterationConfig) -> Optional[int]:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030098 pass
99
koder aka kdanilov70227062016-11-26 23:23:21 +0200100 def get_not_done_stages(self, storage: Storage) -> Dict[int, IterationConfig]:
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200101 not_done = {} # type: Dict[int, IterationConfig]
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +0200102
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200103 for run_id, iteration_config in enumerate(self.iterations_configs):
koder aka kdanilov70227062016-11-26 23:23:21 +0200104 info_path = "result/{}/info".format(run_id)
105 if info_path in storage:
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200106 info = cast(Dict[str, Any], storage.get(info_path)) # type: Dict[str, Any]
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300107
koder aka kdanilov70227062016-11-26 23:23:21 +0200108 assert isinstance(info, dict), \
109 "Broken storage at path {}. Expect test info dict, obtain {!r}".format(info_path, info)
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300110
koder aka kdanilov70227062016-11-26 23:23:21 +0200111 info = info.copy()
112 del info['begin_time']
113 del info['end_time']
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300114
koder aka kdanilov70227062016-11-26 23:23:21 +0200115 iter_name = "Unnamed" if iteration_config is None else iteration_config.name
116 expected_config = {
117 'name': self.name,
118 'iteration_name': iter_name,
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200119 'iteration_config': iteration_config.raw(),
koder aka kdanilov70227062016-11-26 23:23:21 +0200120 'params': self.config.params,
121 'nodes': self.sorted_nodes_ids
122 }
123
124 assert info == expected_config, \
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200125 ("Test info at path {} is not equal to expected config." +
126 "Maybe configuration was changed before test was restarted. " +
127 "Current cfg is {!r}, expected cfg is {!r}").format(info_path, info, expected_config)
koder aka kdanilov70227062016-11-26 23:23:21 +0200128
129 logger.info("Test iteration {} found in storage and will be skipped".format(iter_name))
130 else:
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200131 not_done[run_id] = iteration_config
132
133 return not_done
koder aka kdanilov70227062016-11-26 23:23:21 +0200134
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +0200135 def run(self) -> None:
136 not_in_storage = self.get_not_done_stages(self.config.storage)
koder aka kdanilov70227062016-11-26 23:23:21 +0200137
138 if not not_in_storage:
139 logger.info("All test iteration in storage already. Skip test")
140 return
141
koder aka kdanilovffaf48d2016-12-27 02:25:29 +0200142 logger.debug("Run test io.{} with profile {!r} on nodes {}.".format(self.name,
143 self.load_profile_name,
144 ",".join(self.sorted_nodes_ids)))
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200145 logger.debug("Prepare nodes")
koder aka kdanilov70227062016-11-26 23:23:21 +0200146
147 with ThreadPoolExecutor(len(self.nodes)) as pool:
148 list(pool.map(self.config_node, self.nodes))
149
150 # +5% - is a rough estimation for additional operations
151 run_times = [self.get_expected_runtime(iteration_config) for iteration_config in not_in_storage.values()]
152 if None not in run_times:
153 expected_run_time = int(sum(run_times) * 1.05)
154 exec_time_s = sec_to_str(expected_run_time)
155 now_dt = datetime.datetime.now()
156 end_dt = now_dt + datetime.timedelta(0, expected_run_time)
157 logger.info("Entire test should takes aroud: {} and finished at {:%H:%M:%S}"
158 .format(exec_time_s, end_dt))
159
160 for run_id, iteration_config in sorted(not_in_storage.items()):
161 iter_name = "Unnamed" if iteration_config is None else iteration_config.name
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200162 logger.info("Run test iteration %s", iter_name)
koder aka kdanilov70227062016-11-26 23:23:21 +0200163
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200164 current_result_path = "result/{}_{}".format(iteration_config.summary, run_id)
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200165 results = [] # type: List[NodeTestResults]
koder aka kdanilov70227062016-11-26 23:23:21 +0200166 for idx in range(self.max_retry):
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200167 logger.debug("Prepare iteration %s", iter_name)
koder aka kdanilov70227062016-11-26 23:23:21 +0200168
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200169 # prepare nodes for new iterations
170 futures = [pool.submit(self.prepare_iteration, node, iteration_config) for node in self.nodes]
171 wait(futures)
172
173 # run iteration
174 logger.debug("Run iteration %s", iter_name)
175 try:
176 futures = []
177 for node in self.nodes:
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200178 path = "{}/measurement/{}".format(current_result_path, node.info.node_id())
179 futures.append(pool.submit(self.run_iteration, node, iteration_config, path))
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200180
181 results = [fut.result() for fut in futures]
182 break
183 except EnvironmentError:
184 if self.max_retry - 1 == idx:
185 logger.exception("Fio failed")
186 raise StopTestError()
187 logger.exception("During fio run")
188 logger.info("Sleeping %ss and retrying", self.retry_time)
189 time.sleep(self.retry_time)
koder aka kdanilov70227062016-11-26 23:23:21 +0200190
191 start_times = [] # type: List[int]
192 stop_times = [] # type: List[int]
193
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200194 # TODO: FIX result processing - NodeTestResults
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200195 for result in results:
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200196 for name, serie in result.series.items():
197 start_times.append(serie.start_at)
198 stop_times.append(serie.step * len(serie.data))
koder aka kdanilov70227062016-11-26 23:23:21 +0200199
200 min_start_time = min(start_times)
201 max_start_time = max(start_times)
202 min_stop_time = min(stop_times)
203 max_stop_time = max(stop_times)
204
205 max_allowed_time_diff = int((min_stop_time - max_start_time) * self.max_rel_time_diff)
206 max_allowed_time_diff = max(max_allowed_time_diff, self.max_time_diff)
207
208 if min_start_time + self.max_time_diff < max_allowed_time_diff:
209 logger.warning("Too large difference in {}:{} start time - {}. Max recommended difference is {}"
210 .format(self.name, iter_name, max_start_time - min_start_time, self.max_time_diff))
211
212 if min_stop_time + self.max_time_diff < max_allowed_time_diff:
213 logger.warning("Too large difference in {}:{} stop time - {}. Max recommended difference is {}"
214 .format(self.name, iter_name, max_start_time - min_start_time, self.max_time_diff))
215
216 test_config = {
koder aka kdanilovffaf48d2016-12-27 02:25:29 +0200217 'suite': 'io',
218 'test': self.name,
219 'profile': self.load_profile_name,
koder aka kdanilov70227062016-11-26 23:23:21 +0200220 'iteration_name': iter_name,
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200221 'iteration_config': iteration_config.raw(),
koder aka kdanilov70227062016-11-26 23:23:21 +0200222 'params': self.config.params,
223 'nodes': self.sorted_nodes_ids,
224 'begin_time': min_start_time,
225 'end_time': max_stop_time
226 }
227
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200228 self.process_storage_queue()
koder aka kdanilovffaf48d2016-12-27 02:25:29 +0200229 self.config.storage.put(test_config, current_result_path, "info")
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200230 self.config.storage.sync()
231
232 if self.on_idle is not None:
233 self.on_idle()
234
235 def store_data(self, val: Any, type: str, prefix: str, *path: str) -> None:
236 self.storage_q.put((val, type, prefix, path))
237
238 def process_storage_queue(self) -> None:
239 while not self.storage_q.empty():
240 value, val_type, subpath, val_path = self.storage_q.get()
241 if val_type == 'raw':
242 self.config.storage.put_raw(value, subpath, *val_path)
243 elif val_type == 'yaml':
244 self.config.storage.put(value, subpath, *val_path)
245 elif val_type == 'array':
246 self.config.storage.put_array(value, subpath, *val_path)
247 else:
248 logger.error("Internal logic error - unknown data stop type {!r}".format(val_path))
249 raise StopTestError()
koder aka kdanilov70227062016-11-26 23:23:21 +0200250
251 @abc.abstractmethod
252 def config_node(self, node: IRPCNode) -> None:
253 pass
254
255 @abc.abstractmethod
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200256 def prepare_iteration(self, node: IRPCNode, iter_config: IterationConfig) -> None:
257 pass
258
259 @abc.abstractmethod
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200260 def run_iteration(self, node: IRPCNode, iter_config: IterationConfig, stor_prefix: str) -> NodeTestResults:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300261 pass
262
263
koder aka kdanilov70227062016-11-26 23:23:21 +0200264class TwoScriptTest(ThreadedTest, metaclass=abc.ABCMeta):
265 def __init__(self, *dt, **mp) -> None:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300266 ThreadedTest.__init__(self, *dt, **mp)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300267 self.prerun_script = self.config.params['prerun_script']
268 self.run_script = self.config.params['run_script']
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300269 self.prerun_tout = self.config.params.get('prerun_tout', 3600)
270 self.run_tout = self.config.params.get('run_tout', 3600)
koder aka kdanilov70227062016-11-26 23:23:21 +0200271 self.iterations_configs = [None]
Yulia Portnova7ddfa732015-02-24 17:32:58 +0200272
koder aka kdanilov70227062016-11-26 23:23:21 +0200273 def get_expected_runtime(self, iter_cfg: IterationConfig) -> Optional[int]:
274 return None
Yulia Portnova7ddfa732015-02-24 17:32:58 +0200275
koder aka kdanilov70227062016-11-26 23:23:21 +0200276 def config_node(self, node: IRPCNode) -> None:
277 node.copy_file(self.run_script, self.join_remote(self.run_script))
278 node.copy_file(self.prerun_script, self.join_remote(self.prerun_script))
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300279
koder aka kdanilov70227062016-11-26 23:23:21 +0200280 cmd = self.join_remote(self.prerun_script)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300281 cmd += ' ' + self.config.params.get('prerun_opts', '')
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300282 node.run(cmd, timeout=self.prerun_tout)
Yulia Portnova7ddfa732015-02-24 17:32:58 +0200283
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200284 def prepare_iteration(self, node: IRPCNode, iter_config: IterationConfig) -> None:
285 pass
286
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200287 def run_iteration(self, node: IRPCNode, iter_config: IterationConfig, stor_prefix: str) -> NodeTestResults:
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200288 # TODO: have to store logs
koder aka kdanilov70227062016-11-26 23:23:21 +0200289 cmd = self.join_remote(self.run_script)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300290 cmd += ' ' + self.config.params.get('run_opts', '')
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200291 return self.parse_results(node.run(cmd, timeout=self.run_tout))
koder aka kdanilov70227062016-11-26 23:23:21 +0200292
293 @abc.abstractmethod
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200294 def parse_results(self, data: str) -> NodeTestResults:
koder aka kdanilov70227062016-11-26 23:23:21 +0200295 pass
296