blob: 8636596d2e9a1cf274cf47e58a0c180270ccb25e [file] [log] [blame]
koder aka kdanilov4643fd62015-02-10 16:20:13 -08001import abc
koder aka kdanilovbc2c8982015-06-13 02:50:43 +03002import time
3import logging
koder aka kdanilov4643fd62015-02-10 16:20:13 -08004import os.path
koder aka kdanilov70227062016-11-26 23:23:21 +02005import datetime
6from typing import Dict, Any, List, Optional, Tuple, cast
koder aka kdanilov652cd802015-04-13 12:21:07 +03007
koder aka kdanilovbc2c8982015-06-13 02:50:43 +03008from concurrent.futures import ThreadPoolExecutor
koder aka kdanilov4643fd62015-02-10 16:20:13 -08009
koder aka kdanilov70227062016-11-26 23:23:21 +020010from ..utils import Barrier, StopTestError, sec_to_str
11from ..node_interfaces import IRPCNode
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020012from ..storage import Storage
koder aka kdanilov70227062016-11-26 23:23:21 +020013from ..result_classes import RawTestResults
14
15import agent
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030016
17
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030018logger = logging.getLogger("wally")
koder aka kdanilov88407ff2015-05-26 15:35:57 +030019
20
koder aka kdanilov70227062016-11-26 23:23:21 +020021__doc__ = "Contains base classes for performance tests"
22
23
24class TestInputConfig:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030025 """
26 this class describe test input configuration
koder aka kdanilov88407ff2015-05-26 15:35:57 +030027
koder aka kdanilov70227062016-11-26 23:23:21 +020028 test_type - test type name
29 params - parameters from yaml file for this test
30 test_uuid - UUID to be used to create file names & Co
31 log_directory - local directory to store results
32 nodes - nodes to run tests on
33 remote_dir - directory on nodes to be used for local files
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030034 """
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030035 def __init__(self,
36 test_type: str,
37 params: Dict[str, Any],
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020038 run_uuid: str,
koder aka kdanilov70227062016-11-26 23:23:21 +020039 nodes: List[IRPCNode],
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020040 storage: Storage,
koder aka kdanilov70227062016-11-26 23:23:21 +020041 remote_dir: str) -> None:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030042 self.test_type = test_type
43 self.params = params
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020044 self.run_uuid = run_uuid
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030045 self.nodes = nodes
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020046 self.storage = storage
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030047 self.remote_dir = remote_dir
koder aka kdanilov88407ff2015-05-26 15:35:57 +030048
49
koder aka kdanilov70227062016-11-26 23:23:21 +020050class IterationConfig:
51 name = None # type: str
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030052
53
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030054class PerfTest:
koder aka kdanilov70227062016-11-26 23:23:21 +020055 """Base class for all tests"""
56 name = None # type: str
57 max_retry = 3
58 retry_time = 30
59
60 def __init__(self, config: TestInputConfig) -> None:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030061 self.config = config
koder aka kdanilove2de58c2015-04-24 22:59:36 +030062 self.stop_requested = False
koder aka kdanilov70227062016-11-26 23:23:21 +020063 self.nodes = self.config.nodes # type: List[IRPCNode]
64 self.sorted_nodes_ids = sorted(node.info.node_id() for node in self.nodes)
koder aka kdanilove2de58c2015-04-24 22:59:36 +030065
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030066 def request_stop(self) -> None:
koder aka kdanilove2de58c2015-04-24 22:59:36 +030067 self.stop_requested = True
koder aka kdanilov2066daf2015-04-23 21:05:41 +030068
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030069 def join_remote(self, path: str) -> str:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030070 return os.path.join(self.config.remote_dir, path)
koder aka kdanilov4500a5f2015-04-17 16:55:17 +030071
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030072 @abc.abstractmethod
koder aka kdanilov70227062016-11-26 23:23:21 +020073 def run(self, storage: Storage) -> None:
koder aka kdanilov4643fd62015-02-10 16:20:13 -080074 pass
75
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030076 @abc.abstractmethod
koder aka kdanilov39e449e2016-12-17 15:15:26 +020077 def format_for_console(self, data: Any) -> str:
koder aka kdanilovec1b9732015-04-23 20:43:29 +030078 pass
79
koder aka kdanilov4643fd62015-02-10 16:20:13 -080080
koder aka kdanilov70227062016-11-26 23:23:21 +020081RunTestRes = Tuple[RawTestResults, Tuple[int, int]]
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030082
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030083
koder aka kdanilov70227062016-11-26 23:23:21 +020084class ThreadedTest(PerfTest, metaclass=abc.ABCMeta):
85 """Base class for tests, which spawn separated thread for each node"""
86
87 # max allowed time difference between starts and stops of run of the same test on different test nodes
88 # used_max_diff = max((min_run_time * max_rel_time_diff), max_time_diff)
89 max_time_diff = 5
90 max_rel_time_diff = 0.05
91
92 def __init__(self, config: TestInputConfig) -> None:
93 PerfTest.__init__(self, config)
94 self.iterations_configs = [None] # type: List[Optional[IterationConfig]]
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030095
96 @abc.abstractmethod
koder aka kdanilov70227062016-11-26 23:23:21 +020097 def get_expected_runtime(self, iter_cfg: IterationConfig) -> Optional[int]:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030098 pass
99
koder aka kdanilov70227062016-11-26 23:23:21 +0200100 def get_not_done_stages(self, storage: Storage) -> Dict[int, IterationConfig]:
101 start_run_id = max(int(name) for _, name in storage.list('result')) + 1
102 not_in_storage = {} # type: Dict[int, IterationConfig]
103 for run_id, iteration_config in enumerate(self.iterations_configs, start_run_id):
104 info_path = "result/{}/info".format(run_id)
105 if info_path in storage:
106 info = cast(Dict[str, Any], storage[info_path]) # type: Dict[str, Any]
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300107
koder aka kdanilov70227062016-11-26 23:23:21 +0200108 assert isinstance(info, dict), \
109 "Broken storage at path {}. Expect test info dict, obtain {!r}".format(info_path, info)
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300110
koder aka kdanilov70227062016-11-26 23:23:21 +0200111 info = info.copy()
112 del info['begin_time']
113 del info['end_time']
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300114
koder aka kdanilov70227062016-11-26 23:23:21 +0200115 iter_name = "Unnamed" if iteration_config is None else iteration_config.name
116 expected_config = {
117 'name': self.name,
118 'iteration_name': iter_name,
119 'iteration_config': iteration_config,
120 'params': self.config.params,
121 'nodes': self.sorted_nodes_ids
122 }
123
124 assert info == expected_config, \
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200125 ("Test info at path {} is not equal to expected config." +
126 "Maybe configuration was changed before test was restarted. " +
127 "Current cfg is {!r}, expected cfg is {!r}").format(info_path, info, expected_config)
koder aka kdanilov70227062016-11-26 23:23:21 +0200128
129 logger.info("Test iteration {} found in storage and will be skipped".format(iter_name))
130 else:
131 not_in_storage[run_id] = iteration_config
132 return not_in_storage
133
134 def run(self, storage: Storage) -> None:
135 not_in_storage = self.get_not_done_stages(storage)
136
137 if not not_in_storage:
138 logger.info("All test iteration in storage already. Skip test")
139 return
140
141 logger.debug("Run test {} on nodes {}.".format(self.name, ",".join(self.sorted_nodes_ids)))
142
143 barrier = Barrier(len(self.nodes))
144
145 logger.debug("Run preparation")
146
147 with ThreadPoolExecutor(len(self.nodes)) as pool:
148 list(pool.map(self.config_node, self.nodes))
149
150 # +5% - is a rough estimation for additional operations
151 run_times = [self.get_expected_runtime(iteration_config) for iteration_config in not_in_storage.values()]
152 if None not in run_times:
153 expected_run_time = int(sum(run_times) * 1.05)
154 exec_time_s = sec_to_str(expected_run_time)
155 now_dt = datetime.datetime.now()
156 end_dt = now_dt + datetime.timedelta(0, expected_run_time)
157 logger.info("Entire test should takes aroud: {} and finished at {:%H:%M:%S}"
158 .format(exec_time_s, end_dt))
159
160 for run_id, iteration_config in sorted(not_in_storage.items()):
161 iter_name = "Unnamed" if iteration_config is None else iteration_config.name
162 logger.info("Run test iteration {} ".format(iter_name))
163
164 results = [] # type: List[RunTestRes]
165 for idx in range(self.max_retry):
166 barrier.wait()
167 try:
168 futures = [pool.submit(self.do_test, node, iteration_config) for node in self.nodes]
169 results = [fut.result() for fut in futures]
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200170 except EnvironmentError as exc:
koder aka kdanilov70227062016-11-26 23:23:21 +0200171 if self.max_retry - 1 == idx:
172 raise StopTestError("Fio failed") from exc
173 logger.exception("During fio run")
174 else:
175 if all(results):
176 break
177
178 logger.info("Sleeping %ss and retrying", self.retry_time)
179 time.sleep(self.retry_time)
180
181 start_times = [] # type: List[int]
182 stop_times = [] # type: List[int]
183
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200184 mstorage = storage.sub_storage("result", str(run_id), "measurement")
koder aka kdanilov70227062016-11-26 23:23:21 +0200185 for (result, (t_start, t_stop)), node in zip(results, self.config.nodes):
186 for metrics_name, data in result.items():
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200187 mstorage[node.info.node_id(), metrics_name] = data # type: ignore
koder aka kdanilov70227062016-11-26 23:23:21 +0200188 start_times.append(t_start)
189 stop_times.append(t_stop)
190
191 min_start_time = min(start_times)
192 max_start_time = max(start_times)
193 min_stop_time = min(stop_times)
194 max_stop_time = max(stop_times)
195
196 max_allowed_time_diff = int((min_stop_time - max_start_time) * self.max_rel_time_diff)
197 max_allowed_time_diff = max(max_allowed_time_diff, self.max_time_diff)
198
199 if min_start_time + self.max_time_diff < max_allowed_time_diff:
200 logger.warning("Too large difference in {}:{} start time - {}. Max recommended difference is {}"
201 .format(self.name, iter_name, max_start_time - min_start_time, self.max_time_diff))
202
203 if min_stop_time + self.max_time_diff < max_allowed_time_diff:
204 logger.warning("Too large difference in {}:{} stop time - {}. Max recommended difference is {}"
205 .format(self.name, iter_name, max_start_time - min_start_time, self.max_time_diff))
206
207 test_config = {
208 'name': self.name,
209 'iteration_name': iter_name,
210 'iteration_config': iteration_config,
211 'params': self.config.params,
212 'nodes': self.sorted_nodes_ids,
213 'begin_time': min_start_time,
214 'end_time': max_stop_time
215 }
216
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200217 storage["result", str(run_id), "info"] = test_config # type: ignore
koder aka kdanilov70227062016-11-26 23:23:21 +0200218
219 @abc.abstractmethod
220 def config_node(self, node: IRPCNode) -> None:
221 pass
222
223 @abc.abstractmethod
224 def do_test(self, node: IRPCNode, iter_config: IterationConfig) -> RunTestRes:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300225 pass
226
227
koder aka kdanilov70227062016-11-26 23:23:21 +0200228class TwoScriptTest(ThreadedTest, metaclass=abc.ABCMeta):
229 def __init__(self, *dt, **mp) -> None:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300230 ThreadedTest.__init__(self, *dt, **mp)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300231 self.prerun_script = self.config.params['prerun_script']
232 self.run_script = self.config.params['run_script']
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300233 self.prerun_tout = self.config.params.get('prerun_tout', 3600)
234 self.run_tout = self.config.params.get('run_tout', 3600)
koder aka kdanilov70227062016-11-26 23:23:21 +0200235 self.iterations_configs = [None]
Yulia Portnova7ddfa732015-02-24 17:32:58 +0200236
koder aka kdanilov70227062016-11-26 23:23:21 +0200237 def get_expected_runtime(self, iter_cfg: IterationConfig) -> Optional[int]:
238 return None
Yulia Portnova7ddfa732015-02-24 17:32:58 +0200239
koder aka kdanilov70227062016-11-26 23:23:21 +0200240 def config_node(self, node: IRPCNode) -> None:
241 node.copy_file(self.run_script, self.join_remote(self.run_script))
242 node.copy_file(self.prerun_script, self.join_remote(self.prerun_script))
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300243
koder aka kdanilov70227062016-11-26 23:23:21 +0200244 cmd = self.join_remote(self.prerun_script)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300245 cmd += ' ' + self.config.params.get('prerun_opts', '')
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300246 node.run(cmd, timeout=self.prerun_tout)
Yulia Portnova7ddfa732015-02-24 17:32:58 +0200247
koder aka kdanilov70227062016-11-26 23:23:21 +0200248 def do_test(self, node: IRPCNode, iter_config: IterationConfig) -> RunTestRes:
249 cmd = self.join_remote(self.run_script)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300250 cmd += ' ' + self.config.params.get('run_opts', '')
251 t1 = time.time()
koder aka kdanilov70227062016-11-26 23:23:21 +0200252 res = self.parse_results(node.run(cmd, timeout=self.run_tout))
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300253 t2 = time.time()
koder aka kdanilov70227062016-11-26 23:23:21 +0200254 return res, (int(t1), int(t2))
255
256 @abc.abstractmethod
257 def parse_results(self, data: str) -> RawTestResults:
258 pass
259