blob: bc6b1159f06f76cee7eb854feffb59ae66a2c611 [file] [log] [blame]
koder aka kdanilovf2865172016-12-30 03:35:11 +02001import re
koder aka kdanilov4643fd62015-02-10 16:20:13 -08002import abc
koder aka kdanilovbc2c8982015-06-13 02:50:43 +03003import time
koder aka kdanilovf2865172016-12-30 03:35:11 +02004import array
5import struct
koder aka kdanilovbc2c8982015-06-13 02:50:43 +03006import logging
koder aka kdanilov4643fd62015-02-10 16:20:13 -08007import os.path
koder aka kdanilov70227062016-11-26 23:23:21 +02008import datetime
koder aka kdanilovf2865172016-12-30 03:35:11 +02009from typing import Any, List, Optional, Callable, cast, Iterator, Tuple, Iterable
koder aka kdanilov652cd802015-04-13 12:21:07 +030010
koder aka kdanilovf2865172016-12-30 03:35:11 +020011from concurrent.futures import ThreadPoolExecutor, wait, Future
koder aka kdanilov4643fd62015-02-10 16:20:13 -080012
koder aka kdanilovf2865172016-12-30 03:35:11 +020013from ..utils import StopTestError, sec_to_str, get_time_interval_printable_info
koder aka kdanilov70227062016-11-26 23:23:21 +020014from ..node_interfaces import IRPCNode
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020015from ..storage import Storage
koder aka kdanilovf2865172016-12-30 03:35:11 +020016from ..result_classes import TestSuiteConfig, TestJobConfig, JobMetrics, TimeSeries
koder aka kdanilov70227062016-11-26 23:23:21 +020017
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030018
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030019logger = logging.getLogger("wally")
koder aka kdanilov88407ff2015-05-26 15:35:57 +030020
21
koder aka kdanilov70227062016-11-26 23:23:21 +020022__doc__ = "Contains base classes for performance tests"
23
24
koder aka kdanilovf2865172016-12-30 03:35:11 +020025class ResultStorage:
26 ts_header_format = "!IIIcc"
koder aka kdanilov88407ff2015-05-26 15:35:57 +030027
koder aka kdanilovf2865172016-12-30 03:35:11 +020028 def __init__(self, storage: Storage, job_config_cls: type) -> None:
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020029 self.storage = storage
koder aka kdanilovf2865172016-12-30 03:35:11 +020030 self.job_config_cls = job_config_cls
koder aka kdanilov88407ff2015-05-26 15:35:57 +030031
koder aka kdanilovf2865172016-12-30 03:35:11 +020032 def get_suite_root(self, suite_type: str, idx: int) -> str:
33 return "results/{}_{}".format(suite_type, idx)
koder aka kdanilov88407ff2015-05-26 15:35:57 +030034
koder aka kdanilovf2865172016-12-30 03:35:11 +020035 def get_job_root(self, suite_root: str, summary: str, run_id: int) -> str:
36 return "{}/{}_{}".format(suite_root, summary, run_id)
37
38 # store
39 def put_suite_config(self, config: TestSuiteConfig, root: str) -> None:
40 self.storage.put(config, root, "config.yml")
41
42 def put_job_config(self, config: TestJobConfig, root: str) -> None:
43 self.storage.put(config, root, "config.yml")
44
45 def get_suite_config(self, suite_root: str) -> TestSuiteConfig:
46 return self.storage.load(TestSuiteConfig, suite_root, "config.yml")
47
48 def get_job_node_prefix(self, job_root_path: str, node_id: str) -> str:
49 return "{}/{}".format(job_root_path, node_id)
50
51 def get_ts_path(self, job_root_path: str, node_id: str, dev: str, sensor_name: str) -> str:
52 return "{}_{}.{}".format(self.get_job_node_prefix(job_root_path, node_id), dev, sensor_name)
53
54 def put_ts(self, ts: TimeSeries, job_root_path: str, node_id: str, dev: str, sensor_name: str) -> None:
55 # TODO: check that 'metrics', 'dev' and 'node_id' match required patterns
56 root_path = self.get_ts_path(job_root_path, node_id, dev, sensor_name)
57
58 if len(ts.data) / ts.second_axis_size != len(ts.times):
59 logger.error("Unbalanced time series data. Array size has % elements, while time size has %",
60 len(ts.data) / ts.second_axis_size, len(ts.times))
61 raise StopTestError()
62
63 with self.storage.get_fd(root_path, "cb") as fd:
64 header = struct.pack(self.ts_header_format,
65 ts.second_axis_size,
66 len(ts.data),
67 len(ts.times),
68 cast(array.array, ts.data).typecode.encode("ascii"),
69 cast(array.array, ts.times).typecode.encode("ascii"))
70 fd.write(header)
71 cast(array.array, ts.data).tofile(fd)
72 cast(array.array, ts.times).tofile(fd)
73
74 if ts.raw is not None:
75 self.storage.put_raw(ts.raw, root_path + ":raw")
76
77 def put_extra(self, job_root: str, node_id: str, key: str, data: bytes) -> None:
78 self.storage.put_raw(data, job_root, node_id + "_" + key)
79
80 def list_suites(self) -> Iterator[Tuple[TestSuiteConfig, str]]:
81 """iterates over (suite_name, suite_id, suite_root_path)
82 primary this function output should be used as input into list_jobs_in_suite method
83 """
84 ts_re = re.compile(r"[a-zA-Z]+_\d+$")
85 for is_file, name in self.storage.list("results"):
86 if not is_file:
87 rr = ts_re.match(name)
88 if rr:
89 path = "results/" + name
90 yield self.get_suite_config(path), path
91
92 def list_jobs_in_suite(self, suite_root_path: str) -> Iterator[Tuple[TestJobConfig, str, int]]:
93 """iterates over (job_summary, job_root_path)
94 primary this function output should be used as input into list_ts_in_job method
95 """
96 ts_re = re.compile(r"(?P<job_summary>[a-zA-Z0-9]+)_(?P<id>\d+)$")
97 for is_file, name in self.storage.list(suite_root_path):
98 if is_file:
99 continue
100 rr = ts_re.match(name)
101 if rr:
102 config_path = "{}/{}/config.yml".format(suite_root_path, name)
103 if config_path in self.storage:
104 cfg = self.storage.load(self.job_config_cls, config_path)
105 yield cfg, "{}/{}".format(suite_root_path, name), int(rr.group("id"))
106
107 def list_ts_in_job(self, job_root_path: str) -> Iterator[Tuple[str, str, str]]:
108 """iterates over (node_id, device_name, sensor_name)
109 primary this function output should be used as input into load_ts method
110 """
111 # TODO: check that all TS files available
112 ts_re = re.compile(r"(?P<node_id>\d+\.\d+\.\d+\.\d+:\d+)_(?P<dev>[^.]+)\.(?P<sensor>[a-z_]+)$")
113 already_found = set()
114 for is_file, name in self.storage.list(job_root_path):
115 if not is_file:
116 continue
117 rr = ts_re.match(name)
118 if rr:
119 key = (rr.group("node_id"), rr.group("dev"), rr.group("sensor"))
120 if key not in already_found:
121 already_found.add(key)
122 yield key
123
124 def load_ts(self, root_path: str, node_id: str, dev: str, sensor_name: str) -> TimeSeries:
125 path = self.get_ts_path(root_path, node_id, dev, sensor_name)
126
127 with self.storage.get_fd(path, "rb") as fd:
128 header = fd.read(struct.calcsize(self.ts_header_format))
129 second_axis_size, data_sz, time_sz, data_typecode, time_typecode = \
130 struct.unpack(self.ts_header_format, header)
131
132 data = array.array(data_typecode.decode("ascii"))
133 times = array.array(time_typecode.decode("ascii"))
134
135 data.fromfile(fd, data_sz)
136 times.fromfile(fd, time_sz)
137
138 # calculate number of elements
139 return TimeSeries("{}.{}".format(dev, sensor_name),
140 raw=None,
141 data=data,
142 times=times,
143 second_axis_size=second_axis_size)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300144
145
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200146class PerfTest(metaclass=abc.ABCMeta):
koder aka kdanilov70227062016-11-26 23:23:21 +0200147 """Base class for all tests"""
148 name = None # type: str
149 max_retry = 3
150 retry_time = 30
koder aka kdanilovf2865172016-12-30 03:35:11 +0200151 job_config_cls = None # type: type
koder aka kdanilov70227062016-11-26 23:23:21 +0200152
koder aka kdanilovf2865172016-12-30 03:35:11 +0200153 def __init__(self, storage: Storage, config: TestSuiteConfig, idx: int, on_idle: Callable[[], None] = None) -> None:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300154 self.config = config
koder aka kdanilove2de58c2015-04-24 22:59:36 +0300155 self.stop_requested = False
koder aka kdanilovf2865172016-12-30 03:35:11 +0200156 self.sorted_nodes_ids = sorted(node.info.node_id() for node in self.config.nodes)
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200157 self.on_idle = on_idle
koder aka kdanilovf2865172016-12-30 03:35:11 +0200158 self.storage = storage
159 self.rstorage = ResultStorage(self.storage, self.job_config_cls)
160 self.idx = idx
koder aka kdanilove2de58c2015-04-24 22:59:36 +0300161
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300162 def request_stop(self) -> None:
koder aka kdanilove2de58c2015-04-24 22:59:36 +0300163 self.stop_requested = True
koder aka kdanilov2066daf2015-04-23 21:05:41 +0300164
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300165 def join_remote(self, path: str) -> str:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300166 return os.path.join(self.config.remote_dir, path)
koder aka kdanilov4500a5f2015-04-17 16:55:17 +0300167
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300168 @abc.abstractmethod
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +0200169 def run(self) -> None:
koder aka kdanilov4643fd62015-02-10 16:20:13 -0800170 pass
171
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300172 @abc.abstractmethod
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200173 def format_for_console(self, data: Any) -> str:
koder aka kdanilovec1b9732015-04-23 20:43:29 +0300174 pass
175
koder aka kdanilov4643fd62015-02-10 16:20:13 -0800176
koder aka kdanilov70227062016-11-26 23:23:21 +0200177class ThreadedTest(PerfTest, metaclass=abc.ABCMeta):
178 """Base class for tests, which spawn separated thread for each node"""
179
180 # max allowed time difference between starts and stops of run of the same test on different test nodes
181 # used_max_diff = max((min_run_time * max_rel_time_diff), max_time_diff)
182 max_time_diff = 5
183 max_rel_time_diff = 0.05
koder aka kdanilovffaf48d2016-12-27 02:25:29 +0200184 load_profile_name = None # type: str
koder aka kdanilov70227062016-11-26 23:23:21 +0200185
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200186 def __init__(self, *args, **kwargs) -> None:
187 PerfTest.__init__(self, *args, **kwargs)
koder aka kdanilovf2865172016-12-30 03:35:11 +0200188 self.job_configs = [None] # type: List[Optional[TestJobConfig]]
189 self.suite_root_path = self.rstorage.get_suite_root(self.config.test_type, self.idx)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300190
191 @abc.abstractmethod
koder aka kdanilovf2865172016-12-30 03:35:11 +0200192 def get_expected_runtime(self, iter_cfg: TestJobConfig) -> Optional[int]:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300193 pass
194
koder aka kdanilovf2865172016-12-30 03:35:11 +0200195 def get_not_done_stages(self) -> Iterable[Tuple[int, TestJobConfig]]:
196 all_jobs = dict(enumerate(self.job_configs))
197 for db_config, path, jid in self.rstorage.list_jobs_in_suite(self.suite_root_path):
198 if jid in all_jobs:
199 job_config = all_jobs[jid]
200 if job_config != db_config:
201 logger.error("Test info at path '%s/config' is not equal to expected config for iteration %s.%s." +
202 " Maybe configuration was changed before test was restarted. " +
203 "DB cfg is:\n %s\nExpected cfg is:\n %s\nFix DB or rerun test from beginning",
204 path, self.name, job_config.summary,
205 str(db_config).replace("\n", "\n "),
206 str(job_config).replace("\n", "\n "))
207 raise StopTestError()
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +0200208
koder aka kdanilovf2865172016-12-30 03:35:11 +0200209 logger.info("Test iteration %s.%s found in storage and will be skipped",
210 self.name, job_config.summary)
211 del all_jobs[jid]
212 return all_jobs.items()
koder aka kdanilov70227062016-11-26 23:23:21 +0200213
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +0200214 def run(self) -> None:
koder aka kdanilovf2865172016-12-30 03:35:11 +0200215 try:
216 cfg = self.rstorage.get_suite_config(self.suite_root_path)
217 except KeyError:
218 cfg = None
219
220 if cfg is not None and cfg != self.config:
221 logger.error("Current suite %s config is not equal to found in storage at %s",
222 self.config.test_type, self.suite_root_path)
223 raise StopTestError()
224
225 not_in_storage = list(self.get_not_done_stages())
koder aka kdanilov70227062016-11-26 23:23:21 +0200226
227 if not not_in_storage:
228 logger.info("All test iteration in storage already. Skip test")
229 return
230
koder aka kdanilovf2865172016-12-30 03:35:11 +0200231 self.rstorage.put_suite_config(self.config, self.suite_root_path)
232
233 logger.debug("Run test %s with profile %r on nodes %s.", self.name,
234 self.load_profile_name,
235 ",".join(self.sorted_nodes_ids))
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200236 logger.debug("Prepare nodes")
koder aka kdanilov70227062016-11-26 23:23:21 +0200237
koder aka kdanilov70227062016-11-26 23:23:21 +0200238
koder aka kdanilovf2865172016-12-30 03:35:11 +0200239 with ThreadPoolExecutor(len(self.config.nodes)) as pool:
240 # config nodes
241 list(pool.map(self.config_node, self.config.nodes))
242
243 run_times = [self.get_expected_runtime(job_config) for _, job_config in not_in_storage]
244
koder aka kdanilov70227062016-11-26 23:23:21 +0200245 if None not in run_times:
koder aka kdanilovf2865172016-12-30 03:35:11 +0200246 # +5% - is a rough estimation for additional operations
koder aka kdanilov70227062016-11-26 23:23:21 +0200247 expected_run_time = int(sum(run_times) * 1.05)
koder aka kdanilov70227062016-11-26 23:23:21 +0200248
koder aka kdanilovf2865172016-12-30 03:35:11 +0200249 exec_time_s, end_dt_s = get_time_interval_printable_info(expected_run_time)
250 logger.info("Entire test should takes around %s and finished at %s", exec_time_s, end_dt_s)
koder aka kdanilov70227062016-11-26 23:23:21 +0200251
koder aka kdanilovf2865172016-12-30 03:35:11 +0200252 for run_id, job_config in not_in_storage:
253 job_path = self.rstorage.get_job_root(self.suite_root_path, job_config.summary, run_id)
254
255 jfutures = [] # type: List[Future]
koder aka kdanilov70227062016-11-26 23:23:21 +0200256 for idx in range(self.max_retry):
koder aka kdanilovf2865172016-12-30 03:35:11 +0200257 logger.debug("Prepare job %s", job_config.summary)
koder aka kdanilov70227062016-11-26 23:23:21 +0200258
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200259 # prepare nodes for new iterations
koder aka kdanilovf2865172016-12-30 03:35:11 +0200260 wait([pool.submit(self.prepare_iteration, node, job_config) for node in self.config.nodes])
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200261
koder aka kdanilovf2865172016-12-30 03:35:11 +0200262 expected_job_time = self.get_expected_runtime(job_config)
263 exec_time_s, end_dt_s = get_time_interval_printable_info(expected_job_time)
264 logger.info("Job should takes around %s and finished at %s", exec_time_s, end_dt_s)
265
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200266 try:
koder aka kdanilovf2865172016-12-30 03:35:11 +0200267 jfutures = []
268 for node in self.config.nodes:
269 future = pool.submit(self.run_iteration, node, job_config, job_path)
270 jfutures.append(future)
271 # test completed successfully, stop retrying
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200272 break
273 except EnvironmentError:
274 if self.max_retry - 1 == idx:
275 logger.exception("Fio failed")
276 raise StopTestError()
277 logger.exception("During fio run")
koder aka kdanilovf2865172016-12-30 03:35:11 +0200278 logger.info("Sleeping %ss and retrying job", self.retry_time)
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200279 time.sleep(self.retry_time)
koder aka kdanilov70227062016-11-26 23:23:21 +0200280
281 start_times = [] # type: List[int]
282 stop_times = [] # type: List[int]
283
koder aka kdanilovf2865172016-12-30 03:35:11 +0200284 for future in jfutures:
285 for (node_id, dev, sensor_name), ts in future.result().items():
286 self.rstorage.put_ts(ts, job_path, node_id=node_id, dev=dev, sensor_name=sensor_name)
koder aka kdanilov70227062016-11-26 23:23:21 +0200287
koder aka kdanilovf2865172016-12-30 03:35:11 +0200288 if len(ts.times) >= 2:
289 start_times.append(ts.times[0])
290 stop_times.append(ts.times[-1])
koder aka kdanilov70227062016-11-26 23:23:21 +0200291
koder aka kdanilovf2865172016-12-30 03:35:11 +0200292 if len(start_times) > 0:
293 min_start_time = min(start_times)
294 max_start_time = max(start_times)
295 min_stop_time = min(stop_times)
296 max_stop_time = max(stop_times)
koder aka kdanilov70227062016-11-26 23:23:21 +0200297
koder aka kdanilovf2865172016-12-30 03:35:11 +0200298 max_allowed_time_diff = int((min_stop_time - max_start_time) * self.max_rel_time_diff)
299 max_allowed_time_diff = max(max_allowed_time_diff, self.max_time_diff)
koder aka kdanilov70227062016-11-26 23:23:21 +0200300
koder aka kdanilovf2865172016-12-30 03:35:11 +0200301 if min_start_time + self.max_time_diff < max_allowed_time_diff:
302 logger.warning("Too large difference in %s:%s start time - %s. " +
303 "Max recommended difference is %s",
304 self.name, job_config.summary,
305 max_start_time - min_start_time, self.max_time_diff)
koder aka kdanilov70227062016-11-26 23:23:21 +0200306
koder aka kdanilovf2865172016-12-30 03:35:11 +0200307 if min_stop_time + self.max_time_diff < max_allowed_time_diff:
308 logger.warning("Too large difference in %s:%s stop time - %s. " +
309 "Max recommended difference is %s",
310 self.name, job_config.summary,
311 max_start_time - min_start_time, self.max_time_diff)
koder aka kdanilov70227062016-11-26 23:23:21 +0200312
koder aka kdanilovf2865172016-12-30 03:35:11 +0200313 self.rstorage.put_job_config(job_config, job_path)
314 self.storage.sync()
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200315
316 if self.on_idle is not None:
317 self.on_idle()
318
koder aka kdanilov70227062016-11-26 23:23:21 +0200319 @abc.abstractmethod
320 def config_node(self, node: IRPCNode) -> None:
321 pass
322
323 @abc.abstractmethod
koder aka kdanilovf2865172016-12-30 03:35:11 +0200324 def prepare_iteration(self, node: IRPCNode, iter_config: TestJobConfig) -> None:
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200325 pass
326
327 @abc.abstractmethod
koder aka kdanilovf2865172016-12-30 03:35:11 +0200328 def run_iteration(self, node: IRPCNode, iter_config: TestJobConfig, stor_prefix: str) -> JobMetrics:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300329 pass
330
331
koder aka kdanilov70227062016-11-26 23:23:21 +0200332class TwoScriptTest(ThreadedTest, metaclass=abc.ABCMeta):
333 def __init__(self, *dt, **mp) -> None:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300334 ThreadedTest.__init__(self, *dt, **mp)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300335 self.prerun_script = self.config.params['prerun_script']
336 self.run_script = self.config.params['run_script']
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300337 self.prerun_tout = self.config.params.get('prerun_tout', 3600)
338 self.run_tout = self.config.params.get('run_tout', 3600)
koder aka kdanilov70227062016-11-26 23:23:21 +0200339 self.iterations_configs = [None]
Yulia Portnova7ddfa732015-02-24 17:32:58 +0200340
koder aka kdanilovf2865172016-12-30 03:35:11 +0200341 def get_expected_runtime(self, iter_cfg: TestJobConfig) -> Optional[int]:
koder aka kdanilov70227062016-11-26 23:23:21 +0200342 return None
Yulia Portnova7ddfa732015-02-24 17:32:58 +0200343
koder aka kdanilov70227062016-11-26 23:23:21 +0200344 def config_node(self, node: IRPCNode) -> None:
345 node.copy_file(self.run_script, self.join_remote(self.run_script))
346 node.copy_file(self.prerun_script, self.join_remote(self.prerun_script))
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300347
koder aka kdanilov70227062016-11-26 23:23:21 +0200348 cmd = self.join_remote(self.prerun_script)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300349 cmd += ' ' + self.config.params.get('prerun_opts', '')
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300350 node.run(cmd, timeout=self.prerun_tout)
Yulia Portnova7ddfa732015-02-24 17:32:58 +0200351
koder aka kdanilovf2865172016-12-30 03:35:11 +0200352 def prepare_iteration(self, node: IRPCNode, iter_config: TestJobConfig) -> None:
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200353 pass
354
koder aka kdanilovf2865172016-12-30 03:35:11 +0200355 def run_iteration(self, node: IRPCNode, iter_config: TestJobConfig, stor_prefix: str) -> JobMetrics:
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200356 # TODO: have to store logs
koder aka kdanilov70227062016-11-26 23:23:21 +0200357 cmd = self.join_remote(self.run_script)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300358 cmd += ' ' + self.config.params.get('run_opts', '')
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200359 return self.parse_results(node.run(cmd, timeout=self.run_tout))
koder aka kdanilov70227062016-11-26 23:23:21 +0200360
361 @abc.abstractmethod
koder aka kdanilovf2865172016-12-30 03:35:11 +0200362 def parse_results(self, data: str) -> JobMetrics:
koder aka kdanilov70227062016-11-26 23:23:21 +0200363 pass
364