blob: 57fd3e7085986c9904c2aa3e2254e2c79a4f05c1 [file] [log] [blame]
koder aka kdanilov4643fd62015-02-10 16:20:13 -08001import abc
koder aka kdanilov66839a92015-04-11 13:22:31 +03002import time
koder aka kdanilov783b4542015-04-23 18:57:04 +03003import socket
koder aka kdanilov4d4771c2015-04-23 01:32:02 +03004import random
koder aka kdanilov4643fd62015-02-10 16:20:13 -08005import os.path
koder aka kdanilove21d7472015-02-14 19:02:04 -08006import logging
koder aka kdanilovea22c3d2015-04-21 03:42:22 +03007import datetime
koder aka kdanilove21d7472015-02-14 19:02:04 -08008
koder aka kdanilov783b4542015-04-23 18:57:04 +03009from paramiko import SSHException
10
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030011from wally.utils import ssize_to_b, open_for_append_or_create, sec_to_str
12
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030013from wally.ssh_utils import (copy_paths, run_over_ssh,
14 save_to_remote, ssh_mkdir,
15 # delete_file,
16 connect, read_from_remote, Local)
17
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030018from . import postgres
19from .io import agent as io_agent
20from .io import formatter as io_formatter
21from .io.results_loader import parse_output
koder aka kdanilov652cd802015-04-13 12:21:07 +030022
koder aka kdanilov4643fd62015-02-10 16:20:13 -080023
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030024logger = logging.getLogger("wally")
koder aka kdanilove21d7472015-02-14 19:02:04 -080025
26
koder aka kdanilov4643fd62015-02-10 16:20:13 -080027class IPerfTest(object):
koder aka kdanilovec1b9732015-04-23 20:43:29 +030028 def __init__(self, on_result_cb, test_uuid, node,
29 log_directory=None, coordination_queue=None):
koder aka kdanilov4643fd62015-02-10 16:20:13 -080030 self.on_result_cb = on_result_cb
koder aka kdanilov4500a5f2015-04-17 16:55:17 +030031 self.log_directory = log_directory
32 self.node = node
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030033 self.test_uuid = test_uuid
koder aka kdanilovec1b9732015-04-23 20:43:29 +030034 self.coordination_queue = coordination_queue
35
36 def coordinate(self, data):
37 if self.coordination_queue is not None:
38 self.coordination_queue.put(data)
koder aka kdanilov4643fd62015-02-10 16:20:13 -080039
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030040 def pre_run(self):
koder aka kdanilov4643fd62015-02-10 16:20:13 -080041 pass
42
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030043 def cleanup(self):
koder aka kdanilov4500a5f2015-04-17 16:55:17 +030044 pass
45
koder aka kdanilov4643fd62015-02-10 16:20:13 -080046 @abc.abstractmethod
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030047 def run(self, barrier):
koder aka kdanilov4643fd62015-02-10 16:20:13 -080048 pass
49
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030050 @classmethod
51 def format_for_console(cls, data):
52 msg = "{0}.format_for_console".format(cls.__name__)
53 raise NotImplementedError(msg)
54
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030055 def run_over_ssh(self, cmd, **kwargs):
56 return run_over_ssh(self.node.connection, cmd,
57 node=self.node.get_conn_id(), **kwargs)
58
koder aka kdanilovec1b9732015-04-23 20:43:29 +030059 @classmethod
60 def coordination_th(cls, coord_q, barrier, num_threads):
61 pass
62
koder aka kdanilov4643fd62015-02-10 16:20:13 -080063
Yulia Portnova7ddfa732015-02-24 17:32:58 +020064class TwoScriptTest(IPerfTest):
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030065 remote_tmp_dir = '/tmp'
66
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030067 def __init__(self, opts, *dt, **mp):
68 IPerfTest.__init__(self, *dt, **mp)
Yulia Portnova7ddfa732015-02-24 17:32:58 +020069 self.opts = opts
Yulia Portnova7ddfa732015-02-24 17:32:58 +020070
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030071 if 'run_script' in self.opts:
72 self.run_script = self.opts['run_script']
73 self.prepare_script = self.opts['prepare_script']
Yulia Portnova7ddfa732015-02-24 17:32:58 +020074
75 def get_remote_for_script(self, script):
76 return os.path.join(self.tmp_dir, script.rpartition('/')[2])
77
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030078 def copy_script(self, src):
Yulia Portnova7ddfa732015-02-24 17:32:58 +020079 remote_path = self.get_remote_for_script(src)
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030080 copy_paths(self.node.connection, {src: remote_path})
Yulia Portnova7ddfa732015-02-24 17:32:58 +020081 return remote_path
82
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030083 def pre_run(self):
84 remote_script = self.copy_script(self.node.connection,
85 self.pre_run_script)
Yulia Portnova7ddfa732015-02-24 17:32:58 +020086 cmd = remote_script
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030087 self.run_over_ssh(cmd)
Yulia Portnova7ddfa732015-02-24 17:32:58 +020088
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030089 def run(self, barrier):
90 remote_script = self.copy_script(self.node.connection, self.run_script)
Yulia Portnova886a2562015-04-07 11:16:13 +030091 cmd_opts = ' '.join(["%s %s" % (key, val) for key, val
92 in self.opts.items()])
93 cmd = remote_script + ' ' + cmd_opts
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030094 out_err = self.run_over_ssh(cmd)
koder aka kdanilov66839a92015-04-11 13:22:31 +030095 self.on_result(out_err, cmd)
Yulia Portnova7ddfa732015-02-24 17:32:58 +020096
97 def parse_results(self, out):
98 for line in out.split("\n"):
99 key, separator, value = line.partition(":")
100 if key and value:
101 self.on_result_cb((key, float(value)))
102
koder aka kdanilov66839a92015-04-11 13:22:31 +0300103 def on_result(self, out_err, cmd):
104 try:
105 self.parse_results(out_err)
106 except Exception as exc:
koder aka kdanilovec1b9732015-04-23 20:43:29 +0300107 msg_templ = "Error during postprocessing results: {0!s}. {1}"
108 raise RuntimeError(msg_templ.format(exc, out_err))
Yulia Portnova7ddfa732015-02-24 17:32:58 +0200109
110
111class PgBenchTest(TwoScriptTest):
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +0300112 root = os.path.dirname(postgres.__file__)
113 prepare_script = os.path.join(root, "prepare.sh")
114 run_script = os.path.join(root, "run.sh")
koder aka kdanilov4500a5f2015-04-17 16:55:17 +0300115
116
koder aka kdanilov4643fd62015-02-10 16:20:13 -0800117class IOPerfTest(IPerfTest):
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300118 io_py_remote_templ = "/tmp/test/{0}/io/disk_test_agent.py"
119 log_fl_templ = "/tmp/test/{0}/io/disk_test_agent_log.txt"
120 pid_file_templ = "/tmp/test/{0}/io/disk_test_agent_pid_file"
121 task_file_templ = "/tmp/test/{0}/io/io_task.cfg"
koder aka kdanilov2c473092015-03-29 17:12:13 +0300122
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300123 def __init__(self, test_options, *dt, **mp):
124 IPerfTest.__init__(self, *dt, **mp)
koder aka kdanilov2c473092015-03-29 17:12:13 +0300125 self.options = test_options
koder aka kdanilovda45e882015-04-06 02:24:42 +0300126 self.config_fname = test_options['cfg']
koder aka kdanilov4500a5f2015-04-17 16:55:17 +0300127 self.alive_check_interval = test_options.get('alive_check_interval')
koder aka kdanilovda45e882015-04-06 02:24:42 +0300128 self.config_params = test_options.get('params', {})
129 self.tool = test_options.get('tool', 'fio')
130 self.raw_cfg = open(self.config_fname).read()
koder aka kdanilov0c598a12015-04-21 03:01:40 +0300131 self.configs = list(io_agent.parse_all_in_1(self.raw_cfg,
132 self.config_params))
koder aka kdanilov4643fd62015-02-10 16:20:13 -0800133
koder aka kdanilov4500a5f2015-04-17 16:55:17 +0300134 cmd_log = os.path.join(self.log_directory, "task_compiled.cfg")
135 raw_res = os.path.join(self.log_directory, "raw_results.txt")
koder aka kdanilovda45e882015-04-06 02:24:42 +0300136
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300137 self.io_py_remote = self.io_py_remote_templ.format(self.test_uuid)
138 self.log_fl = self.log_fl_templ.format(self.test_uuid)
139 self.pid_file = self.pid_file_templ.format(self.test_uuid)
140 self.task_file = self.task_file_templ.format(self.test_uuid)
141
koder aka kdanilov4500a5f2015-04-17 16:55:17 +0300142 fio_command_file = open_for_append_or_create(cmd_log)
koder aka kdanilov0c598a12015-04-21 03:01:40 +0300143
144 cfg_s_it = io_agent.compile_all_in_1(self.raw_cfg, self.config_params)
145 splitter = "\n\n" + "-" * 60 + "\n\n"
146 fio_command_file.write(splitter.join(cfg_s_it))
koder aka kdanilov4500a5f2015-04-17 16:55:17 +0300147 self.fio_raw_results_file = open_for_append_or_create(raw_res)
148
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300149 def cleanup(self):
150 # delete_file(conn, self.io_py_remote)
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +0300151 # Need to remove tempo files, used for testing
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300152 pass
koder aka kdanilov4500a5f2015-04-17 16:55:17 +0300153
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300154 def pre_run(self):
koder aka kdanilov783b4542015-04-23 18:57:04 +0300155 with self.node.connection.open_sftp() as sftp:
156 ssh_mkdir(sftp,
157 os.path.dirname(self.io_py_remote),
158 intermediate=True)
159
koder aka kdanilov652cd802015-04-13 12:21:07 +0300160 try:
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300161 self.run_over_ssh('which fio')
koder aka kdanilov652cd802015-04-13 12:21:07 +0300162 except OSError:
163 # TODO: install fio, if not installed
koder aka kdanilov652cd802015-04-13 12:21:07 +0300164 for i in range(3):
165 try:
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300166 self.run_over_ssh("sudo apt-get -y install fio")
koder aka kdanilov652cd802015-04-13 12:21:07 +0300167 break
168 except OSError as err:
169 time.sleep(3)
170 else:
koder aka kdanilovec1b9732015-04-23 20:43:29 +0300171 raise OSError("Can't install fio - " + str(err))
koder aka kdanilovda45e882015-04-06 02:24:42 +0300172
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300173 local_fname = os.path.splitext(io_agent.__file__)[0] + ".py"
174
175 self.files_to_copy = {
176 local_fname: self.io_py_remote,
177 }
178
179 copy_paths(self.node.connection, self.files_to_copy)
koder aka kdanilov4643fd62015-02-10 16:20:13 -0800180
koder aka kdanilove87ae652015-04-20 02:14:35 +0300181 if self.options.get('prefill_files', True):
182 files = {}
koder aka kdanilov652cd802015-04-13 12:21:07 +0300183
koder aka kdanilov0c598a12015-04-21 03:01:40 +0300184 for section in self.configs:
185 sz = ssize_to_b(section.vals['size'])
koder aka kdanilove87ae652015-04-20 02:14:35 +0300186 msz = sz / (1024 ** 2)
koder aka kdanilov0c598a12015-04-21 03:01:40 +0300187
koder aka kdanilove87ae652015-04-20 02:14:35 +0300188 if sz % (1024 ** 2) != 0:
189 msz += 1
koder aka kdanilov6e2ae792015-03-04 18:02:24 -0800190
koder aka kdanilov0c598a12015-04-21 03:01:40 +0300191 fname = section.vals['filename']
koder aka kdanilov652cd802015-04-13 12:21:07 +0300192
koder aka kdanilove87ae652015-04-20 02:14:35 +0300193 # if already has other test with the same file name
194 # take largest size
195 files[fname] = max(files.get(fname, 0), msz)
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +0300196
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300197 cmd_templ = "dd oflag=direct " + \
198 "if=/dev/zero of={0} bs={1} count={2}"
koder aka kdanilove87ae652015-04-20 02:14:35 +0300199
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300200 if self.options.get("use_sudo", True):
201 cmd_templ = "sudo " + cmd_templ
202
koder aka kdanilove87ae652015-04-20 02:14:35 +0300203 ssize = 0
204 stime = time.time()
205
206 for fname, curr_sz in files.items():
207 cmd = cmd_templ.format(fname, 1024 ** 2, curr_sz)
208 ssize += curr_sz
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300209 self.run_over_ssh(cmd, timeout=curr_sz)
koder aka kdanilove87ae652015-04-20 02:14:35 +0300210
211 ddtime = time.time() - stime
212 if ddtime > 1E-3:
213 fill_bw = int(ssize / ddtime)
214 mess = "Initiall dd fill bw is {0} MiBps for this vm"
215 logger.info(mess.format(fill_bw))
216 else:
217 logger.warning("Test files prefill disabled")
koder aka kdanilov6e2ae792015-03-04 18:02:24 -0800218
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300219 def run(self, barrier):
220 cmd_templ = "screen -S {screen_name} -d -m " + \
221 "env python2 {0} -p {pid_file} -o {results_file} " + \
222 "--type {1} {2} --json {3}"
223
224 if self.options.get("use_sudo", True):
225 cmd_templ = "sudo " + cmd_templ
koder aka kdanilov66839a92015-04-11 13:22:31 +0300226
227 params = " ".join("{0}={1}".format(k, v)
228 for k, v in self.config_params.items())
229
230 if "" != params:
231 params = "--params " + params
232
koder aka kdanilov783b4542015-04-23 18:57:04 +0300233 with self.node.connection.open_sftp() as sftp:
234 save_to_remote(sftp,
235 self.task_file, self.raw_cfg)
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300236
237 screen_name = self.test_uuid
238 cmd = cmd_templ.format(self.io_py_remote,
239 self.tool,
240 params,
241 self.task_file,
242 pid_file=self.pid_file,
243 results_file=self.log_fl,
244 screen_name=screen_name)
koder aka kdanilov66839a92015-04-11 13:22:31 +0300245 logger.debug("Waiting on barrier")
koder aka kdanilov652cd802015-04-13 12:21:07 +0300246
koder aka kdanilov0c598a12015-04-21 03:01:40 +0300247 exec_time = io_agent.calculate_execution_time(self.configs)
koder aka kdanilov652cd802015-04-13 12:21:07 +0300248 exec_time_str = sec_to_str(exec_time)
249
koder aka kdanilov2c473092015-03-29 17:12:13 +0300250 try:
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300251 timeout = int(exec_time * 2 + 300)
koder aka kdanilov652cd802015-04-13 12:21:07 +0300252 if barrier.wait():
koder aka kdanilovea22c3d2015-04-21 03:42:22 +0300253 templ = "Test should takes about {0}." + \
254 " Should finish at {1}," + \
255 " will wait at most till {2}"
256 now_dt = datetime.datetime.now()
257 end_dt = now_dt + datetime.timedelta(0, exec_time)
258 wait_till = now_dt + datetime.timedelta(0, timeout)
259
260 logger.info(templ.format(exec_time_str,
261 end_dt.strftime("%H:%M:%S"),
262 wait_till.strftime("%H:%M:%S")))
koder aka kdanilov652cd802015-04-13 12:21:07 +0300263
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300264 self.run_over_ssh(cmd)
265 logger.debug("Test started in screen {0}".format(screen_name))
266
267 end_of_wait_time = timeout + time.time()
268
269 # time_till_check = random.randint(30, 90)
270 time_till_check = 1
271
272 pid = None
273 no_pid_file = True
274 tcp_conn_timeout = 30
275 pid_get_timeout = 30 + time.time()
koder aka kdanilov783b4542015-04-23 18:57:04 +0300276 connection_ok = True
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300277
278 # TODO: add monitoring socket
279 if self.node.connection is not Local:
280 self.node.connection.close()
281
koder aka kdanilov783b4542015-04-23 18:57:04 +0300282 conn_id = self.node.get_conn_id()
283
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300284 while end_of_wait_time > time.time():
285 conn = None
286 time.sleep(time_till_check)
287
288 try:
289 if self.node.connection is not Local:
290 conn = connect(self.node.conn_url,
291 conn_timeout=tcp_conn_timeout)
292 else:
293 conn = self.node.connection
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300294
koder aka kdanilov783b4542015-04-23 18:57:04 +0300295 try:
296 with conn.open_sftp() as sftp:
koder aka kdanilovec1b9732015-04-23 20:43:29 +0300297 try:
298 pid = read_from_remote(sftp, self.pid_file)
299 no_pid_file = False
300 except (NameError, IOError):
301 no_pid_file = True
302 finally:
303 if conn is not Local:
304 conn.close()
305 conn = None
koder aka kdanilov783b4542015-04-23 18:57:04 +0300306
307 if no_pid_file:
308 if pid is None:
309 if time.time() > pid_get_timeout:
310 msg = ("On node {0} pid file doesn't " +
311 "appears in time")
312 logger.error(msg.format(conn_id))
313 raise RuntimeError("Start timeout")
314 else:
315 # execution finished
316 break
317 if not connection_ok:
318 msg = "Connection with {0} is restored"
319 logger.debug(msg.format(conn_id))
320 connection_ok = True
321
koder aka kdanilovec1b9732015-04-23 20:43:29 +0300322 except (socket.error, SSHException, EOFError) as exc:
koder aka kdanilov783b4542015-04-23 18:57:04 +0300323 if connection_ok:
324 connection_ok = False
325 msg = "Lost connection with " + conn_id
koder aka kdanilovec1b9732015-04-23 20:43:29 +0300326 msg += ". Error: " + str(exc)
koder aka kdanilov783b4542015-04-23 18:57:04 +0300327 logger.debug(msg)
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300328
329 logger.debug("Done")
330
331 if self.node.connection is not Local:
332 timeout = tcp_conn_timeout * 3
333 self.node.connection = connect(self.node.conn_url,
334 conn_timeout=timeout)
335
koder aka kdanilov783b4542015-04-23 18:57:04 +0300336 with self.node.connection.open_sftp() as sftp:
337 # try to reboot and then connect
koder aka kdanilovec1b9732015-04-23 20:43:29 +0300338 out_err = read_from_remote(sftp,
koder aka kdanilov783b4542015-04-23 18:57:04 +0300339 self.log_fl)
koder aka kdanilov2c473092015-03-29 17:12:13 +0300340 finally:
341 barrier.exit()
342
koder aka kdanilov652cd802015-04-13 12:21:07 +0300343 self.on_result(out_err, cmd)
344
koder aka kdanilov66839a92015-04-11 13:22:31 +0300345 def on_result(self, out_err, cmd):
346 try:
347 for data in parse_output(out_err):
348 self.on_result_cb(data)
349 except Exception as exc:
koder aka kdanilovec1b9732015-04-23 20:43:29 +0300350 msg_templ = "Error during postprocessing results: {0!s}"
351 raise RuntimeError(msg_templ.format(exc))
koder aka kdanilov66839a92015-04-11 13:22:31 +0300352
353 def merge_results(self, results):
koder aka kdanilov4500a5f2015-04-17 16:55:17 +0300354 if len(results) == 0:
355 return None
356
koder aka kdanilov66839a92015-04-11 13:22:31 +0300357 merged_result = results[0]
358 merged_data = merged_result['res']
359 expected_keys = set(merged_data.keys())
koder aka kdanilov4e9f3ed2015-04-14 11:26:12 +0300360 mergable_fields = ['bw', 'clat', 'iops', 'lat', 'slat']
koder aka kdanilov66839a92015-04-11 13:22:31 +0300361
362 for res in results[1:]:
363 assert res['__meta__'] == merged_result['__meta__']
364
365 data = res['res']
366 diff = set(data.keys()).symmetric_difference(expected_keys)
367
368 msg = "Difference: {0}".format(",".join(diff))
369 assert len(diff) == 0, msg
370
371 for testname, test_data in data.items():
372 res_test_data = merged_data[testname]
373
374 diff = set(test_data.keys()).symmetric_difference(
375 res_test_data.keys())
376
377 msg = "Difference: {0}".format(",".join(diff))
378 assert len(diff) == 0, msg
379
380 for k, v in test_data.items():
381 if k in mergable_fields:
382 res_test_data[k].extend(v)
383 else:
384 msg = "{0!r} != {1!r}".format(res_test_data[k], v)
385 assert res_test_data[k] == v, msg
386
387 return merged_result
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +0300388
389 @classmethod
390 def format_for_console(cls, data):
391 return io_formatter.format_results_for_console(data)