blob: ad5e8a52d3fc672c799ce82b37c273202edb18de [file] [log] [blame]
koder aka kdanilov4643fd62015-02-10 16:20:13 -08001import abc
koder aka kdanilov66839a92015-04-11 13:22:31 +03002import time
koder aka kdanilov783b4542015-04-23 18:57:04 +03003import socket
koder aka kdanilov4d4771c2015-04-23 01:32:02 +03004import random
koder aka kdanilov4643fd62015-02-10 16:20:13 -08005import os.path
koder aka kdanilove21d7472015-02-14 19:02:04 -08006import logging
koder aka kdanilovea22c3d2015-04-21 03:42:22 +03007import datetime
koder aka kdanilove21d7472015-02-14 19:02:04 -08008
koder aka kdanilov783b4542015-04-23 18:57:04 +03009from paramiko import SSHException
10
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030011from wally.utils import ssize_to_b, open_for_append_or_create, sec_to_str
12
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030013from wally.ssh_utils import (copy_paths, run_over_ssh,
koder aka kdanilovabd6ead2015-04-24 02:03:07 +030014 save_to_remote,
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030015 # delete_file,
16 connect, read_from_remote, Local)
17
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030018from . import postgres
19from .io import agent as io_agent
20from .io import formatter as io_formatter
21from .io.results_loader import parse_output
koder aka kdanilov652cd802015-04-13 12:21:07 +030022
koder aka kdanilov4643fd62015-02-10 16:20:13 -080023
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030024logger = logging.getLogger("wally")
koder aka kdanilove21d7472015-02-14 19:02:04 -080025
26
koder aka kdanilov4643fd62015-02-10 16:20:13 -080027class IPerfTest(object):
koder aka kdanilovabd6ead2015-04-24 02:03:07 +030028 def __init__(self, options, is_primary, on_result_cb, test_uuid, node,
koder aka kdanilov2066daf2015-04-23 21:05:41 +030029 log_directory=None,
30 coordination_queue=None,
31 remote_dir="/tmp/wally"):
koder aka kdanilovabd6ead2015-04-24 02:03:07 +030032 self.options = options
koder aka kdanilov4643fd62015-02-10 16:20:13 -080033 self.on_result_cb = on_result_cb
koder aka kdanilov4500a5f2015-04-17 16:55:17 +030034 self.log_directory = log_directory
35 self.node = node
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030036 self.test_uuid = test_uuid
koder aka kdanilovec1b9732015-04-23 20:43:29 +030037 self.coordination_queue = coordination_queue
koder aka kdanilov2066daf2015-04-23 21:05:41 +030038 self.remote_dir = remote_dir
koder aka kdanilovabd6ead2015-04-24 02:03:07 +030039 self.is_primary = is_primary
koder aka kdanilov2066daf2015-04-23 21:05:41 +030040
41 def join_remote(self, path):
42 return os.path.join(self.remote_dir, path)
koder aka kdanilovec1b9732015-04-23 20:43:29 +030043
44 def coordinate(self, data):
45 if self.coordination_queue is not None:
46 self.coordination_queue.put(data)
koder aka kdanilov4643fd62015-02-10 16:20:13 -080047
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030048 def pre_run(self):
koder aka kdanilov4643fd62015-02-10 16:20:13 -080049 pass
50
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030051 def cleanup(self):
koder aka kdanilov4500a5f2015-04-17 16:55:17 +030052 pass
53
koder aka kdanilov4643fd62015-02-10 16:20:13 -080054 @abc.abstractmethod
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030055 def run(self, barrier):
koder aka kdanilov4643fd62015-02-10 16:20:13 -080056 pass
57
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030058 @classmethod
59 def format_for_console(cls, data):
60 msg = "{0}.format_for_console".format(cls.__name__)
61 raise NotImplementedError(msg)
62
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030063 def run_over_ssh(self, cmd, **kwargs):
64 return run_over_ssh(self.node.connection, cmd,
65 node=self.node.get_conn_id(), **kwargs)
66
koder aka kdanilovec1b9732015-04-23 20:43:29 +030067 @classmethod
68 def coordination_th(cls, coord_q, barrier, num_threads):
69 pass
70
koder aka kdanilov4643fd62015-02-10 16:20:13 -080071
Yulia Portnova7ddfa732015-02-24 17:32:58 +020072class TwoScriptTest(IPerfTest):
koder aka kdanilovabd6ead2015-04-24 02:03:07 +030073 def __init__(self, *dt, **mp):
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030074 IPerfTest.__init__(self, *dt, **mp)
Yulia Portnova7ddfa732015-02-24 17:32:58 +020075
koder aka kdanilovabd6ead2015-04-24 02:03:07 +030076 if 'run_script' in self.options:
77 self.run_script = self.options['run_script']
78 self.prepare_script = self.options['prepare_script']
Yulia Portnova7ddfa732015-02-24 17:32:58 +020079
80 def get_remote_for_script(self, script):
81 return os.path.join(self.tmp_dir, script.rpartition('/')[2])
82
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030083 def copy_script(self, src):
Yulia Portnova7ddfa732015-02-24 17:32:58 +020084 remote_path = self.get_remote_for_script(src)
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030085 copy_paths(self.node.connection, {src: remote_path})
Yulia Portnova7ddfa732015-02-24 17:32:58 +020086 return remote_path
87
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030088 def pre_run(self):
89 remote_script = self.copy_script(self.node.connection,
90 self.pre_run_script)
Yulia Portnova7ddfa732015-02-24 17:32:58 +020091 cmd = remote_script
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030092 self.run_over_ssh(cmd)
Yulia Portnova7ddfa732015-02-24 17:32:58 +020093
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030094 def run(self, barrier):
95 remote_script = self.copy_script(self.node.connection, self.run_script)
Yulia Portnova886a2562015-04-07 11:16:13 +030096 cmd_opts = ' '.join(["%s %s" % (key, val) for key, val
koder aka kdanilovabd6ead2015-04-24 02:03:07 +030097 in self.options.items()])
Yulia Portnova886a2562015-04-07 11:16:13 +030098 cmd = remote_script + ' ' + cmd_opts
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030099 out_err = self.run_over_ssh(cmd)
koder aka kdanilov66839a92015-04-11 13:22:31 +0300100 self.on_result(out_err, cmd)
Yulia Portnova7ddfa732015-02-24 17:32:58 +0200101
102 def parse_results(self, out):
103 for line in out.split("\n"):
104 key, separator, value = line.partition(":")
105 if key and value:
106 self.on_result_cb((key, float(value)))
107
koder aka kdanilov66839a92015-04-11 13:22:31 +0300108 def on_result(self, out_err, cmd):
109 try:
110 self.parse_results(out_err)
111 except Exception as exc:
koder aka kdanilovec1b9732015-04-23 20:43:29 +0300112 msg_templ = "Error during postprocessing results: {0!s}. {1}"
113 raise RuntimeError(msg_templ.format(exc, out_err))
Yulia Portnova7ddfa732015-02-24 17:32:58 +0200114
115
116class PgBenchTest(TwoScriptTest):
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +0300117 root = os.path.dirname(postgres.__file__)
118 prepare_script = os.path.join(root, "prepare.sh")
119 run_script = os.path.join(root, "run.sh")
koder aka kdanilov4500a5f2015-04-17 16:55:17 +0300120
121
koder aka kdanilov4643fd62015-02-10 16:20:13 -0800122class IOPerfTest(IPerfTest):
koder aka kdanilov46d4f392015-04-24 11:35:00 +0300123 tcp_conn_timeout = 30
124 max_pig_timeout = 30
koder aka kdanilov2c473092015-03-29 17:12:13 +0300125
koder aka kdanilovabd6ead2015-04-24 02:03:07 +0300126 def __init__(self, *dt, **mp):
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300127 IPerfTest.__init__(self, *dt, **mp)
koder aka kdanilovabd6ead2015-04-24 02:03:07 +0300128 self.config_fname = self.options['cfg']
129 self.alive_check_interval = self.options.get('alive_check_interval')
130 self.config_params = self.options.get('params', {})
131 self.tool = self.options.get('tool', 'fio')
koder aka kdanilovda45e882015-04-06 02:24:42 +0300132 self.raw_cfg = open(self.config_fname).read()
koder aka kdanilov0c598a12015-04-21 03:01:40 +0300133 self.configs = list(io_agent.parse_all_in_1(self.raw_cfg,
134 self.config_params))
koder aka kdanilov4643fd62015-02-10 16:20:13 -0800135
koder aka kdanilov4500a5f2015-04-17 16:55:17 +0300136 cmd_log = os.path.join(self.log_directory, "task_compiled.cfg")
137 raw_res = os.path.join(self.log_directory, "raw_results.txt")
koder aka kdanilovda45e882015-04-06 02:24:42 +0300138
koder aka kdanilov2066daf2015-04-23 21:05:41 +0300139 self.io_py_remote = self.join_remote("agent.py")
140 self.log_fl = self.join_remote("log.txt")
141 self.pid_file = self.join_remote("pid")
142 self.task_file = self.join_remote("task.cfg")
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300143
koder aka kdanilov4500a5f2015-04-17 16:55:17 +0300144 fio_command_file = open_for_append_or_create(cmd_log)
koder aka kdanilov0c598a12015-04-21 03:01:40 +0300145
146 cfg_s_it = io_agent.compile_all_in_1(self.raw_cfg, self.config_params)
147 splitter = "\n\n" + "-" * 60 + "\n\n"
148 fio_command_file.write(splitter.join(cfg_s_it))
koder aka kdanilov4500a5f2015-04-17 16:55:17 +0300149 self.fio_raw_results_file = open_for_append_or_create(raw_res)
150
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300151 def cleanup(self):
152 # delete_file(conn, self.io_py_remote)
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +0300153 # Need to remove tempo files, used for testing
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300154 pass
koder aka kdanilov4500a5f2015-04-17 16:55:17 +0300155
koder aka kdanilov2066daf2015-04-23 21:05:41 +0300156 def prefill_test_files(self):
157 files = {}
158
159 for section in self.configs:
160 sz = ssize_to_b(section.vals['size'])
161 msz = sz / (1024 ** 2)
162
163 if sz % (1024 ** 2) != 0:
164 msz += 1
165
166 fname = section.vals['filename']
167
168 # if already has other test with the same file name
169 # take largest size
170 files[fname] = max(files.get(fname, 0), msz)
171
172 cmd_templ = "dd oflag=direct " + \
173 "if=/dev/zero of={0} bs={1} count={2}"
174
175 if self.options.get("use_sudo", True):
176 cmd_templ = "sudo " + cmd_templ
177
178 ssize = 0
179 stime = time.time()
180
181 for fname, curr_sz in files.items():
182 cmd = cmd_templ.format(fname, 1024 ** 2, curr_sz)
183 ssize += curr_sz
184 self.run_over_ssh(cmd, timeout=curr_sz)
185
186 ddtime = time.time() - stime
187 if ddtime > 1E-3:
188 fill_bw = int(ssize / ddtime)
189 mess = "Initiall dd fill bw is {0} MiBps for this vm"
190 logger.info(mess.format(fill_bw))
koder aka kdanilovabd6ead2015-04-24 02:03:07 +0300191 self.coordinate(('init_bw', fill_bw))
koder aka kdanilov2066daf2015-04-23 21:05:41 +0300192
193 def install_utils(self, max_retry=3, timeout=5):
194 need_install = []
195 for bin_name, package in (('fio', 'fio'), ('screen', 'screen')):
196 try:
197 self.run_over_ssh('which ' + bin_name, nolog=True)
198 except OSError:
199 need_install.append(package)
200
koder aka kdanilovafd98742015-04-24 01:27:22 +0300201 if len(need_install) == 0:
202 return
203
koder aka kdanilov2066daf2015-04-23 21:05:41 +0300204 cmd = "sudo apt-get -y install " + " ".join(need_install)
205
206 for i in range(max_retry):
207 try:
208 self.run_over_ssh(cmd)
209 break
210 except OSError as err:
211 time.sleep(timeout)
212 else:
213 raise OSError("Can't install - " + str(err))
214
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300215 def pre_run(self):
koder aka kdanilova4a570f2015-04-23 22:11:40 +0300216 try:
217 cmd = 'mkdir -p "{0}"'.format(self.remote_dir)
218 if self.options.get("use_sudo", True):
219 cmd = "sudo " + cmd
220 cmd += " ; sudo chown {0} {1}".format(self.node.get_user(),
221 self.remote_dir)
222
223 self.run_over_ssh(cmd)
224 except Exception as exc:
225 msg = "Failed to create folder {0} on remote {1}. Error: {2!s}"
226 msg = msg.format(self.remote_dir, self.node.get_conn_id(), exc)
227 logger.error(msg)
228 raise
koder aka kdanilov783b4542015-04-23 18:57:04 +0300229
koder aka kdanilov2066daf2015-04-23 21:05:41 +0300230 self.install_utils()
koder aka kdanilovda45e882015-04-06 02:24:42 +0300231
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300232 local_fname = os.path.splitext(io_agent.__file__)[0] + ".py"
koder aka kdanilov2066daf2015-04-23 21:05:41 +0300233 files_to_copy = {local_fname: self.io_py_remote}
234 copy_paths(self.node.connection, files_to_copy)
koder aka kdanilov4643fd62015-02-10 16:20:13 -0800235
koder aka kdanilove87ae652015-04-20 02:14:35 +0300236 if self.options.get('prefill_files', True):
koder aka kdanilov2066daf2015-04-23 21:05:41 +0300237 self.prefill_test_files()
koder aka kdanilovabd6ead2015-04-24 02:03:07 +0300238 elif self.is_primary:
koder aka kdanilov2066daf2015-04-23 21:05:41 +0300239 logger.warning("Prefilling of test files is disabled")
koder aka kdanilov6e2ae792015-03-04 18:02:24 -0800240
koder aka kdanilov46d4f392015-04-24 11:35:00 +0300241 def get_test_status(self):
242 is_connected = None
243 has_pid_file = None
244 pid = None
245 err = None
246
247 try:
248 conn = connect(self.node.conn_url,
249 conn_timeout=self.tcp_conn_timeout)
250 with conn:
251 with conn.open_sftp() as sftp:
252 try:
253 pid = read_from_remote(sftp, self.pid_file)
254 has_pid_file = True
255 except (NameError, IOError) as exc:
256 pid = None
257 has_pid_file = False
258
259 is_connected = True
260
261 except (socket.error, SSHException, EOFError) as exc:
262 err = str(exc)
263 is_connected = False
264
265 return is_connected, has_pid_file, pid, err
266
267 def wait_till_finished(self, timeout):
268 conn_id = self.node.get_conn_id()
269 end_of_wait_time = timeout + time.time()
270
271 # time_till_check = random.randint(30, 90)
272 time_till_check = 5
273 pid = None
274 has_pid_file = False
275 pid_get_timeout = self.max_pig_timeout + time.time()
276 curr_connected = True
277
278 while end_of_wait_time > time.time():
279 time.sleep(time_till_check)
280
281 is_connected, has_pid_file, pid, err = self.get_test_status()
282
283 if not has_pid_file:
284 if pid is None and time.time() > pid_get_timeout:
285 msg = ("On node {0} pid file doesn't " +
286 "appears in time")
287 logger.error(msg.format(conn_id))
288 raise RuntimeError("Start timeout")
289 else:
290 # execution finished
291 break
292
293 if is_connected and not curr_connected:
294 msg = "Connection with {0} is restored"
295 logger.debug(msg.format(conn_id))
296 elif not is_connected and curr_connected:
297 msg = "Lost connection with " + conn_id + ". Error: " + err
298 logger.debug(msg)
299
300 curr_connected = is_connected
301
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300302 def run(self, barrier):
koder aka kdanilovabd6ead2015-04-24 02:03:07 +0300303 conn_id = self.node.get_conn_id()
304
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300305 cmd_templ = "screen -S {screen_name} -d -m " + \
306 "env python2 {0} -p {pid_file} -o {results_file} " + \
307 "--type {1} {2} --json {3}"
308
309 if self.options.get("use_sudo", True):
310 cmd_templ = "sudo " + cmd_templ
koder aka kdanilov66839a92015-04-11 13:22:31 +0300311
312 params = " ".join("{0}={1}".format(k, v)
313 for k, v in self.config_params.items())
314
315 if "" != params:
316 params = "--params " + params
317
koder aka kdanilov783b4542015-04-23 18:57:04 +0300318 with self.node.connection.open_sftp() as sftp:
319 save_to_remote(sftp,
320 self.task_file, self.raw_cfg)
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300321
322 screen_name = self.test_uuid
323 cmd = cmd_templ.format(self.io_py_remote,
324 self.tool,
325 params,
326 self.task_file,
327 pid_file=self.pid_file,
328 results_file=self.log_fl,
329 screen_name=screen_name)
koder aka kdanilovabd6ead2015-04-24 02:03:07 +0300330 msg = "Thread for node {0} is waiting on barrier"
331 logger.debug(msg.format(conn_id))
koder aka kdanilov0c598a12015-04-21 03:01:40 +0300332 exec_time = io_agent.calculate_execution_time(self.configs)
koder aka kdanilov652cd802015-04-13 12:21:07 +0300333 exec_time_str = sec_to_str(exec_time)
334
koder aka kdanilov2c473092015-03-29 17:12:13 +0300335 try:
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300336 timeout = int(exec_time * 2 + 300)
koder aka kdanilovabd6ead2015-04-24 02:03:07 +0300337 barrier.wait()
338
339 if self.is_primary:
koder aka kdanilovea22c3d2015-04-21 03:42:22 +0300340 templ = "Test should takes about {0}." + \
341 " Should finish at {1}," + \
342 " will wait at most till {2}"
343 now_dt = datetime.datetime.now()
344 end_dt = now_dt + datetime.timedelta(0, exec_time)
345 wait_till = now_dt + datetime.timedelta(0, timeout)
346
347 logger.info(templ.format(exec_time_str,
348 end_dt.strftime("%H:%M:%S"),
349 wait_till.strftime("%H:%M:%S")))
koder aka kdanilov652cd802015-04-13 12:21:07 +0300350
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300351 self.run_over_ssh(cmd)
koder aka kdanilov46d4f392015-04-24 11:35:00 +0300352
koder aka kdanilovabd6ead2015-04-24 02:03:07 +0300353 msg = "Test on node {0} started in screen {1}"
354 logger.debug(msg.format(conn_id, screen_name))
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300355
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300356 # TODO: add monitoring socket
357 if self.node.connection is not Local:
358 self.node.connection.close()
359
koder aka kdanilov46d4f392015-04-24 11:35:00 +0300360 self.wait_till_finished(timeout)
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300361 logger.debug("Done")
362
363 if self.node.connection is not Local:
koder aka kdanilov46d4f392015-04-24 11:35:00 +0300364 conn_timeout = self.tcp_conn_timeout * 3
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300365 self.node.connection = connect(self.node.conn_url,
koder aka kdanilov46d4f392015-04-24 11:35:00 +0300366 conn_timeout=conn_timeout)
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300367
koder aka kdanilov783b4542015-04-23 18:57:04 +0300368 with self.node.connection.open_sftp() as sftp:
369 # try to reboot and then connect
koder aka kdanilov46d4f392015-04-24 11:35:00 +0300370 out_err = read_from_remote(sftp, self.log_fl)
371
koder aka kdanilov2c473092015-03-29 17:12:13 +0300372 finally:
373 barrier.exit()
374
koder aka kdanilov652cd802015-04-13 12:21:07 +0300375 self.on_result(out_err, cmd)
376
koder aka kdanilov66839a92015-04-11 13:22:31 +0300377 def on_result(self, out_err, cmd):
378 try:
379 for data in parse_output(out_err):
380 self.on_result_cb(data)
koder aka kdanilov46d4f392015-04-24 11:35:00 +0300381 except OSError:
382 raise
koder aka kdanilov66839a92015-04-11 13:22:31 +0300383 except Exception as exc:
koder aka kdanilovec1b9732015-04-23 20:43:29 +0300384 msg_templ = "Error during postprocessing results: {0!s}"
385 raise RuntimeError(msg_templ.format(exc))
koder aka kdanilov66839a92015-04-11 13:22:31 +0300386
387 def merge_results(self, results):
koder aka kdanilov4500a5f2015-04-17 16:55:17 +0300388 if len(results) == 0:
389 return None
390
koder aka kdanilov66839a92015-04-11 13:22:31 +0300391 merged_result = results[0]
392 merged_data = merged_result['res']
393 expected_keys = set(merged_data.keys())
koder aka kdanilov4e9f3ed2015-04-14 11:26:12 +0300394 mergable_fields = ['bw', 'clat', 'iops', 'lat', 'slat']
koder aka kdanilov66839a92015-04-11 13:22:31 +0300395
396 for res in results[1:]:
397 assert res['__meta__'] == merged_result['__meta__']
398
399 data = res['res']
400 diff = set(data.keys()).symmetric_difference(expected_keys)
401
402 msg = "Difference: {0}".format(",".join(diff))
403 assert len(diff) == 0, msg
404
405 for testname, test_data in data.items():
406 res_test_data = merged_data[testname]
407
408 diff = set(test_data.keys()).symmetric_difference(
409 res_test_data.keys())
410
411 msg = "Difference: {0}".format(",".join(diff))
412 assert len(diff) == 0, msg
413
414 for k, v in test_data.items():
415 if k in mergable_fields:
416 res_test_data[k].extend(v)
417 else:
418 msg = "{0!r} != {1!r}".format(res_test_data[k], v)
419 assert res_test_data[k] == v, msg
420
421 return merged_result
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +0300422
423 @classmethod
424 def format_for_console(cls, data):
425 return io_formatter.format_results_for_console(data)