blob: aae475c232ade0d3d73659a5adf8645f1fcff1b1 [file] [log] [blame]
koder aka kdanilov4643fd62015-02-10 16:20:13 -08001import abc
koder aka kdanilovbc2c8982015-06-13 02:50:43 +03002import time
3import logging
koder aka kdanilov4643fd62015-02-10 16:20:13 -08004import os.path
koder aka kdanilov70227062016-11-26 23:23:21 +02005import datetime
6from typing import Dict, Any, List, Optional, Tuple, cast
koder aka kdanilov652cd802015-04-13 12:21:07 +03007
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +02008from concurrent.futures import ThreadPoolExecutor, wait
koder aka kdanilov4643fd62015-02-10 16:20:13 -08009
koder aka kdanilov70227062016-11-26 23:23:21 +020010from ..utils import Barrier, StopTestError, sec_to_str
11from ..node_interfaces import IRPCNode
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020012from ..storage import Storage
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +020013from ..result_classes import NodeTestResults, TimeSerie
koder aka kdanilov70227062016-11-26 23:23:21 +020014
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030015
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030016logger = logging.getLogger("wally")
koder aka kdanilov88407ff2015-05-26 15:35:57 +030017
18
koder aka kdanilov70227062016-11-26 23:23:21 +020019__doc__ = "Contains base classes for performance tests"
20
21
22class TestInputConfig:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030023 """
24 this class describe test input configuration
koder aka kdanilov88407ff2015-05-26 15:35:57 +030025
koder aka kdanilov70227062016-11-26 23:23:21 +020026 test_type - test type name
27 params - parameters from yaml file for this test
28 test_uuid - UUID to be used to create file names & Co
29 log_directory - local directory to store results
30 nodes - nodes to run tests on
31 remote_dir - directory on nodes to be used for local files
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030032 """
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030033 def __init__(self,
34 test_type: str,
35 params: Dict[str, Any],
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020036 run_uuid: str,
koder aka kdanilov70227062016-11-26 23:23:21 +020037 nodes: List[IRPCNode],
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020038 storage: Storage,
koder aka kdanilov70227062016-11-26 23:23:21 +020039 remote_dir: str) -> None:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030040 self.test_type = test_type
41 self.params = params
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020042 self.run_uuid = run_uuid
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030043 self.nodes = nodes
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020044 self.storage = storage
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030045 self.remote_dir = remote_dir
koder aka kdanilov88407ff2015-05-26 15:35:57 +030046
47
koder aka kdanilov70227062016-11-26 23:23:21 +020048class IterationConfig:
49 name = None # type: str
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +020050 summary = None # type: str
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030051
52
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030053class PerfTest:
koder aka kdanilov70227062016-11-26 23:23:21 +020054 """Base class for all tests"""
55 name = None # type: str
56 max_retry = 3
57 retry_time = 30
58
59 def __init__(self, config: TestInputConfig) -> None:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030060 self.config = config
koder aka kdanilove2de58c2015-04-24 22:59:36 +030061 self.stop_requested = False
koder aka kdanilov70227062016-11-26 23:23:21 +020062 self.nodes = self.config.nodes # type: List[IRPCNode]
63 self.sorted_nodes_ids = sorted(node.info.node_id() for node in self.nodes)
koder aka kdanilove2de58c2015-04-24 22:59:36 +030064
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030065 def request_stop(self) -> None:
koder aka kdanilove2de58c2015-04-24 22:59:36 +030066 self.stop_requested = True
koder aka kdanilov2066daf2015-04-23 21:05:41 +030067
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030068 def join_remote(self, path: str) -> str:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030069 return os.path.join(self.config.remote_dir, path)
koder aka kdanilov4500a5f2015-04-17 16:55:17 +030070
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030071 @abc.abstractmethod
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +020072 def run(self) -> None:
koder aka kdanilov4643fd62015-02-10 16:20:13 -080073 pass
74
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030075 @abc.abstractmethod
koder aka kdanilov39e449e2016-12-17 15:15:26 +020076 def format_for_console(self, data: Any) -> str:
koder aka kdanilovec1b9732015-04-23 20:43:29 +030077 pass
78
koder aka kdanilov4643fd62015-02-10 16:20:13 -080079
koder aka kdanilov70227062016-11-26 23:23:21 +020080class ThreadedTest(PerfTest, metaclass=abc.ABCMeta):
81 """Base class for tests, which spawn separated thread for each node"""
82
83 # max allowed time difference between starts and stops of run of the same test on different test nodes
84 # used_max_diff = max((min_run_time * max_rel_time_diff), max_time_diff)
85 max_time_diff = 5
86 max_rel_time_diff = 0.05
87
88 def __init__(self, config: TestInputConfig) -> None:
89 PerfTest.__init__(self, config)
90 self.iterations_configs = [None] # type: List[Optional[IterationConfig]]
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030091
92 @abc.abstractmethod
koder aka kdanilov70227062016-11-26 23:23:21 +020093 def get_expected_runtime(self, iter_cfg: IterationConfig) -> Optional[int]:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030094 pass
95
koder aka kdanilov70227062016-11-26 23:23:21 +020096 def get_not_done_stages(self, storage: Storage) -> Dict[int, IterationConfig]:
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +020097 done_stages = [int(name_id.split("_")[1])
98 for is_leaf, name_id in storage.list('result')
99 if not is_leaf]
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +0200100 if len(done_stages) == 0:
101 start_run_id = 0
102 else:
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200103 start_run_id = max(done_stages) + 1
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +0200104
koder aka kdanilov70227062016-11-26 23:23:21 +0200105 not_in_storage = {} # type: Dict[int, IterationConfig]
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +0200106
107 for run_id, iteration_config in enumerate(self.iterations_configs[start_run_id:], start_run_id):
koder aka kdanilov70227062016-11-26 23:23:21 +0200108 info_path = "result/{}/info".format(run_id)
109 if info_path in storage:
110 info = cast(Dict[str, Any], storage[info_path]) # type: Dict[str, Any]
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300111
koder aka kdanilov70227062016-11-26 23:23:21 +0200112 assert isinstance(info, dict), \
113 "Broken storage at path {}. Expect test info dict, obtain {!r}".format(info_path, info)
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300114
koder aka kdanilov70227062016-11-26 23:23:21 +0200115 info = info.copy()
116 del info['begin_time']
117 del info['end_time']
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300118
koder aka kdanilov70227062016-11-26 23:23:21 +0200119 iter_name = "Unnamed" if iteration_config is None else iteration_config.name
120 expected_config = {
121 'name': self.name,
122 'iteration_name': iter_name,
123 'iteration_config': iteration_config,
124 'params': self.config.params,
125 'nodes': self.sorted_nodes_ids
126 }
127
128 assert info == expected_config, \
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200129 ("Test info at path {} is not equal to expected config." +
130 "Maybe configuration was changed before test was restarted. " +
131 "Current cfg is {!r}, expected cfg is {!r}").format(info_path, info, expected_config)
koder aka kdanilov70227062016-11-26 23:23:21 +0200132
133 logger.info("Test iteration {} found in storage and will be skipped".format(iter_name))
134 else:
135 not_in_storage[run_id] = iteration_config
136 return not_in_storage
137
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +0200138 def run(self) -> None:
139 not_in_storage = self.get_not_done_stages(self.config.storage)
koder aka kdanilov70227062016-11-26 23:23:21 +0200140
141 if not not_in_storage:
142 logger.info("All test iteration in storage already. Skip test")
143 return
144
145 logger.debug("Run test {} on nodes {}.".format(self.name, ",".join(self.sorted_nodes_ids)))
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200146 logger.debug("Prepare nodes")
koder aka kdanilov70227062016-11-26 23:23:21 +0200147
148 with ThreadPoolExecutor(len(self.nodes)) as pool:
149 list(pool.map(self.config_node, self.nodes))
150
151 # +5% - is a rough estimation for additional operations
152 run_times = [self.get_expected_runtime(iteration_config) for iteration_config in not_in_storage.values()]
153 if None not in run_times:
154 expected_run_time = int(sum(run_times) * 1.05)
155 exec_time_s = sec_to_str(expected_run_time)
156 now_dt = datetime.datetime.now()
157 end_dt = now_dt + datetime.timedelta(0, expected_run_time)
158 logger.info("Entire test should takes aroud: {} and finished at {:%H:%M:%S}"
159 .format(exec_time_s, end_dt))
160
161 for run_id, iteration_config in sorted(not_in_storage.items()):
162 iter_name = "Unnamed" if iteration_config is None else iteration_config.name
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200163 logger.info("Run test iteration %s", iter_name)
koder aka kdanilov70227062016-11-26 23:23:21 +0200164
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200165 results = [] # type: List[NodeTestResults]
koder aka kdanilov70227062016-11-26 23:23:21 +0200166 for idx in range(self.max_retry):
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200167 logger.debug("Prepare iteration %s", iter_name)
koder aka kdanilov70227062016-11-26 23:23:21 +0200168
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200169 # prepare nodes for new iterations
170 futures = [pool.submit(self.prepare_iteration, node, iteration_config) for node in self.nodes]
171 wait(futures)
172
173 # run iteration
174 logger.debug("Run iteration %s", iter_name)
175 try:
176 futures = []
177 for node in self.nodes:
178 path = "result/{}_{}/measurement/{}".format(iteration_config.summary,
179 run_id,
180 node.info.node_id())
181 self.config.storage.clear(path)
182 mstorage = self.config.storage.sub_storage(path)
183 futures.append(pool.submit(self.run_iteration, node, iteration_config, mstorage))
184
185 results = [fut.result() for fut in futures]
186 break
187 except EnvironmentError:
188 if self.max_retry - 1 == idx:
189 logger.exception("Fio failed")
190 raise StopTestError()
191 logger.exception("During fio run")
192 logger.info("Sleeping %ss and retrying", self.retry_time)
193 time.sleep(self.retry_time)
koder aka kdanilov70227062016-11-26 23:23:21 +0200194
195 start_times = [] # type: List[int]
196 stop_times = [] # type: List[int]
197
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200198 # TODO: FIX result processing - NodeTestResults
199 result = None # type: NodeTestResults
200 for result in results:
201 mstorage = self.config.storage.sub_storage("result/{}_{}/measurement/{}"
202 .format(result.summary, run_id, result.node_id))
203 serie = None # type: TimeSerie
204 for name, serie in result.series.items():
205 start_times.append(serie.start_at)
206 stop_times.append(serie.step * len(serie.data))
koder aka kdanilov70227062016-11-26 23:23:21 +0200207
208 min_start_time = min(start_times)
209 max_start_time = max(start_times)
210 min_stop_time = min(stop_times)
211 max_stop_time = max(stop_times)
212
213 max_allowed_time_diff = int((min_stop_time - max_start_time) * self.max_rel_time_diff)
214 max_allowed_time_diff = max(max_allowed_time_diff, self.max_time_diff)
215
216 if min_start_time + self.max_time_diff < max_allowed_time_diff:
217 logger.warning("Too large difference in {}:{} start time - {}. Max recommended difference is {}"
218 .format(self.name, iter_name, max_start_time - min_start_time, self.max_time_diff))
219
220 if min_stop_time + self.max_time_diff < max_allowed_time_diff:
221 logger.warning("Too large difference in {}:{} stop time - {}. Max recommended difference is {}"
222 .format(self.name, iter_name, max_start_time - min_start_time, self.max_time_diff))
223
224 test_config = {
225 'name': self.name,
226 'iteration_name': iter_name,
227 'iteration_config': iteration_config,
228 'params': self.config.params,
229 'nodes': self.sorted_nodes_ids,
230 'begin_time': min_start_time,
231 'end_time': max_stop_time
232 }
233
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +0200234 self.config.storage["result", str(run_id), "info"] = test_config # type: ignore
koder aka kdanilov70227062016-11-26 23:23:21 +0200235
236 @abc.abstractmethod
237 def config_node(self, node: IRPCNode) -> None:
238 pass
239
240 @abc.abstractmethod
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200241 def prepare_iteration(self, node: IRPCNode, iter_config: IterationConfig) -> None:
242 pass
243
244 @abc.abstractmethod
245 def run_iteration(self, node: IRPCNode, iter_config: IterationConfig, substorage: Storage) -> NodeTestResults:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300246 pass
247
248
koder aka kdanilov70227062016-11-26 23:23:21 +0200249class TwoScriptTest(ThreadedTest, metaclass=abc.ABCMeta):
250 def __init__(self, *dt, **mp) -> None:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300251 ThreadedTest.__init__(self, *dt, **mp)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300252 self.prerun_script = self.config.params['prerun_script']
253 self.run_script = self.config.params['run_script']
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300254 self.prerun_tout = self.config.params.get('prerun_tout', 3600)
255 self.run_tout = self.config.params.get('run_tout', 3600)
koder aka kdanilov70227062016-11-26 23:23:21 +0200256 self.iterations_configs = [None]
Yulia Portnova7ddfa732015-02-24 17:32:58 +0200257
koder aka kdanilov70227062016-11-26 23:23:21 +0200258 def get_expected_runtime(self, iter_cfg: IterationConfig) -> Optional[int]:
259 return None
Yulia Portnova7ddfa732015-02-24 17:32:58 +0200260
koder aka kdanilov70227062016-11-26 23:23:21 +0200261 def config_node(self, node: IRPCNode) -> None:
262 node.copy_file(self.run_script, self.join_remote(self.run_script))
263 node.copy_file(self.prerun_script, self.join_remote(self.prerun_script))
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300264
koder aka kdanilov70227062016-11-26 23:23:21 +0200265 cmd = self.join_remote(self.prerun_script)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300266 cmd += ' ' + self.config.params.get('prerun_opts', '')
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300267 node.run(cmd, timeout=self.prerun_tout)
Yulia Portnova7ddfa732015-02-24 17:32:58 +0200268
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200269 def prepare_iteration(self, node: IRPCNode, iter_config: IterationConfig) -> None:
270 pass
271
272 def run_iteration(self, node: IRPCNode, iter_config: IterationConfig, substorage: Storage) -> NodeTestResults:
273 # TODO: have to store logs
koder aka kdanilov70227062016-11-26 23:23:21 +0200274 cmd = self.join_remote(self.run_script)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300275 cmd += ' ' + self.config.params.get('run_opts', '')
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200276 return self.parse_results(node.run(cmd, timeout=self.run_tout))
koder aka kdanilov70227062016-11-26 23:23:21 +0200277
278 @abc.abstractmethod
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200279 def parse_results(self, data: str) -> NodeTestResults:
koder aka kdanilov70227062016-11-26 23:23:21 +0200280 pass
281