blob: 116e116cd750f025403e7db0bb115332c5ce5662 [file] [log] [blame]
koder aka kdanilov4643fd62015-02-10 16:20:13 -08001import abc
koder aka kdanilov66839a92015-04-11 13:22:31 +03002import time
koder aka kdanilov783b4542015-04-23 18:57:04 +03003import socket
koder aka kdanilov4d4771c2015-04-23 01:32:02 +03004import random
koder aka kdanilov4643fd62015-02-10 16:20:13 -08005import os.path
koder aka kdanilove21d7472015-02-14 19:02:04 -08006import logging
koder aka kdanilovea22c3d2015-04-21 03:42:22 +03007import datetime
koder aka kdanilove21d7472015-02-14 19:02:04 -08008
koder aka kdanilov783b4542015-04-23 18:57:04 +03009from paramiko import SSHException
10
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030011from wally.utils import ssize_to_b, open_for_append_or_create, sec_to_str
12
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030013from wally.ssh_utils import (copy_paths, run_over_ssh,
14 save_to_remote, ssh_mkdir,
15 # delete_file,
16 connect, read_from_remote, Local)
17
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030018from . import postgres
19from .io import agent as io_agent
20from .io import formatter as io_formatter
21from .io.results_loader import parse_output
koder aka kdanilov652cd802015-04-13 12:21:07 +030022
koder aka kdanilov4643fd62015-02-10 16:20:13 -080023
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030024logger = logging.getLogger("wally")
koder aka kdanilove21d7472015-02-14 19:02:04 -080025
26
koder aka kdanilov4643fd62015-02-10 16:20:13 -080027class IPerfTest(object):
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030028 def __init__(self, on_result_cb, test_uuid, node, log_directory=None):
koder aka kdanilov4643fd62015-02-10 16:20:13 -080029 self.on_result_cb = on_result_cb
koder aka kdanilov4500a5f2015-04-17 16:55:17 +030030 self.log_directory = log_directory
31 self.node = node
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030032 self.test_uuid = test_uuid
koder aka kdanilov4643fd62015-02-10 16:20:13 -080033
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030034 def pre_run(self):
koder aka kdanilov4643fd62015-02-10 16:20:13 -080035 pass
36
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030037 def cleanup(self):
koder aka kdanilov4500a5f2015-04-17 16:55:17 +030038 pass
39
koder aka kdanilov4643fd62015-02-10 16:20:13 -080040 @abc.abstractmethod
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030041 def run(self, barrier):
koder aka kdanilov4643fd62015-02-10 16:20:13 -080042 pass
43
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030044 @classmethod
45 def format_for_console(cls, data):
46 msg = "{0}.format_for_console".format(cls.__name__)
47 raise NotImplementedError(msg)
48
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030049 def run_over_ssh(self, cmd, **kwargs):
50 return run_over_ssh(self.node.connection, cmd,
51 node=self.node.get_conn_id(), **kwargs)
52
koder aka kdanilov4643fd62015-02-10 16:20:13 -080053
Yulia Portnova7ddfa732015-02-24 17:32:58 +020054class TwoScriptTest(IPerfTest):
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030055 remote_tmp_dir = '/tmp'
56
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030057 def __init__(self, opts, *dt, **mp):
58 IPerfTest.__init__(self, *dt, **mp)
Yulia Portnova7ddfa732015-02-24 17:32:58 +020059 self.opts = opts
Yulia Portnova7ddfa732015-02-24 17:32:58 +020060
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030061 if 'run_script' in self.opts:
62 self.run_script = self.opts['run_script']
63 self.prepare_script = self.opts['prepare_script']
Yulia Portnova7ddfa732015-02-24 17:32:58 +020064
65 def get_remote_for_script(self, script):
66 return os.path.join(self.tmp_dir, script.rpartition('/')[2])
67
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030068 def copy_script(self, src):
Yulia Portnova7ddfa732015-02-24 17:32:58 +020069 remote_path = self.get_remote_for_script(src)
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030070 copy_paths(self.node.connection, {src: remote_path})
Yulia Portnova7ddfa732015-02-24 17:32:58 +020071 return remote_path
72
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030073 def pre_run(self):
74 remote_script = self.copy_script(self.node.connection,
75 self.pre_run_script)
Yulia Portnova7ddfa732015-02-24 17:32:58 +020076 cmd = remote_script
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030077 self.run_over_ssh(cmd)
Yulia Portnova7ddfa732015-02-24 17:32:58 +020078
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030079 def run(self, barrier):
80 remote_script = self.copy_script(self.node.connection, self.run_script)
Yulia Portnova886a2562015-04-07 11:16:13 +030081 cmd_opts = ' '.join(["%s %s" % (key, val) for key, val
82 in self.opts.items()])
83 cmd = remote_script + ' ' + cmd_opts
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030084 out_err = self.run_over_ssh(cmd)
koder aka kdanilov66839a92015-04-11 13:22:31 +030085 self.on_result(out_err, cmd)
Yulia Portnova7ddfa732015-02-24 17:32:58 +020086
87 def parse_results(self, out):
88 for line in out.split("\n"):
89 key, separator, value = line.partition(":")
90 if key and value:
91 self.on_result_cb((key, float(value)))
92
koder aka kdanilov66839a92015-04-11 13:22:31 +030093 def on_result(self, out_err, cmd):
94 try:
95 self.parse_results(out_err)
96 except Exception as exc:
97 msg_templ = "Error during postprocessing results: {0!r}. {1}"
98 raise RuntimeError(msg_templ.format(exc.message, out_err))
Yulia Portnova7ddfa732015-02-24 17:32:58 +020099
100
101class PgBenchTest(TwoScriptTest):
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +0300102 root = os.path.dirname(postgres.__file__)
103 prepare_script = os.path.join(root, "prepare.sh")
104 run_script = os.path.join(root, "run.sh")
koder aka kdanilov4500a5f2015-04-17 16:55:17 +0300105
106
koder aka kdanilov4643fd62015-02-10 16:20:13 -0800107class IOPerfTest(IPerfTest):
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300108 io_py_remote_templ = "/tmp/test/{0}/io/disk_test_agent.py"
109 log_fl_templ = "/tmp/test/{0}/io/disk_test_agent_log.txt"
110 pid_file_templ = "/tmp/test/{0}/io/disk_test_agent_pid_file"
111 task_file_templ = "/tmp/test/{0}/io/io_task.cfg"
koder aka kdanilov2c473092015-03-29 17:12:13 +0300112
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300113 def __init__(self, test_options, *dt, **mp):
114 IPerfTest.__init__(self, *dt, **mp)
koder aka kdanilov2c473092015-03-29 17:12:13 +0300115 self.options = test_options
koder aka kdanilovda45e882015-04-06 02:24:42 +0300116 self.config_fname = test_options['cfg']
koder aka kdanilov4500a5f2015-04-17 16:55:17 +0300117 self.alive_check_interval = test_options.get('alive_check_interval')
koder aka kdanilovda45e882015-04-06 02:24:42 +0300118 self.config_params = test_options.get('params', {})
119 self.tool = test_options.get('tool', 'fio')
120 self.raw_cfg = open(self.config_fname).read()
koder aka kdanilov0c598a12015-04-21 03:01:40 +0300121 self.configs = list(io_agent.parse_all_in_1(self.raw_cfg,
122 self.config_params))
koder aka kdanilov4643fd62015-02-10 16:20:13 -0800123
koder aka kdanilov4500a5f2015-04-17 16:55:17 +0300124 cmd_log = os.path.join(self.log_directory, "task_compiled.cfg")
125 raw_res = os.path.join(self.log_directory, "raw_results.txt")
koder aka kdanilovda45e882015-04-06 02:24:42 +0300126
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300127 self.io_py_remote = self.io_py_remote_templ.format(self.test_uuid)
128 self.log_fl = self.log_fl_templ.format(self.test_uuid)
129 self.pid_file = self.pid_file_templ.format(self.test_uuid)
130 self.task_file = self.task_file_templ.format(self.test_uuid)
131
koder aka kdanilov4500a5f2015-04-17 16:55:17 +0300132 fio_command_file = open_for_append_or_create(cmd_log)
koder aka kdanilov0c598a12015-04-21 03:01:40 +0300133
134 cfg_s_it = io_agent.compile_all_in_1(self.raw_cfg, self.config_params)
135 splitter = "\n\n" + "-" * 60 + "\n\n"
136 fio_command_file.write(splitter.join(cfg_s_it))
koder aka kdanilov4500a5f2015-04-17 16:55:17 +0300137 self.fio_raw_results_file = open_for_append_or_create(raw_res)
138
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300139 def cleanup(self):
140 # delete_file(conn, self.io_py_remote)
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +0300141 # Need to remove tempo files, used for testing
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300142 pass
koder aka kdanilov4500a5f2015-04-17 16:55:17 +0300143
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300144 def pre_run(self):
koder aka kdanilov783b4542015-04-23 18:57:04 +0300145 with self.node.connection.open_sftp() as sftp:
146 ssh_mkdir(sftp,
147 os.path.dirname(self.io_py_remote),
148 intermediate=True)
149
koder aka kdanilov652cd802015-04-13 12:21:07 +0300150 try:
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300151 self.run_over_ssh('which fio')
koder aka kdanilov652cd802015-04-13 12:21:07 +0300152 except OSError:
153 # TODO: install fio, if not installed
koder aka kdanilov652cd802015-04-13 12:21:07 +0300154 for i in range(3):
155 try:
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300156 self.run_over_ssh("sudo apt-get -y install fio")
koder aka kdanilov652cd802015-04-13 12:21:07 +0300157 break
158 except OSError as err:
159 time.sleep(3)
160 else:
161 raise OSError("Can't install fio - " + err.message)
koder aka kdanilovda45e882015-04-06 02:24:42 +0300162
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300163 local_fname = os.path.splitext(io_agent.__file__)[0] + ".py"
164
165 self.files_to_copy = {
166 local_fname: self.io_py_remote,
167 }
168
169 copy_paths(self.node.connection, self.files_to_copy)
koder aka kdanilov4643fd62015-02-10 16:20:13 -0800170
koder aka kdanilove87ae652015-04-20 02:14:35 +0300171 if self.options.get('prefill_files', True):
172 files = {}
koder aka kdanilov652cd802015-04-13 12:21:07 +0300173
koder aka kdanilov0c598a12015-04-21 03:01:40 +0300174 for section in self.configs:
175 sz = ssize_to_b(section.vals['size'])
koder aka kdanilove87ae652015-04-20 02:14:35 +0300176 msz = sz / (1024 ** 2)
koder aka kdanilov0c598a12015-04-21 03:01:40 +0300177
koder aka kdanilove87ae652015-04-20 02:14:35 +0300178 if sz % (1024 ** 2) != 0:
179 msz += 1
koder aka kdanilov6e2ae792015-03-04 18:02:24 -0800180
koder aka kdanilov0c598a12015-04-21 03:01:40 +0300181 fname = section.vals['filename']
koder aka kdanilov652cd802015-04-13 12:21:07 +0300182
koder aka kdanilove87ae652015-04-20 02:14:35 +0300183 # if already has other test with the same file name
184 # take largest size
185 files[fname] = max(files.get(fname, 0), msz)
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +0300186
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300187 cmd_templ = "dd oflag=direct " + \
188 "if=/dev/zero of={0} bs={1} count={2}"
koder aka kdanilove87ae652015-04-20 02:14:35 +0300189
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300190 if self.options.get("use_sudo", True):
191 cmd_templ = "sudo " + cmd_templ
192
koder aka kdanilove87ae652015-04-20 02:14:35 +0300193 ssize = 0
194 stime = time.time()
195
196 for fname, curr_sz in files.items():
197 cmd = cmd_templ.format(fname, 1024 ** 2, curr_sz)
198 ssize += curr_sz
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300199 self.run_over_ssh(cmd, timeout=curr_sz)
koder aka kdanilove87ae652015-04-20 02:14:35 +0300200
201 ddtime = time.time() - stime
202 if ddtime > 1E-3:
203 fill_bw = int(ssize / ddtime)
204 mess = "Initiall dd fill bw is {0} MiBps for this vm"
205 logger.info(mess.format(fill_bw))
206 else:
207 logger.warning("Test files prefill disabled")
koder aka kdanilov6e2ae792015-03-04 18:02:24 -0800208
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300209 def run(self, barrier):
210 cmd_templ = "screen -S {screen_name} -d -m " + \
211 "env python2 {0} -p {pid_file} -o {results_file} " + \
212 "--type {1} {2} --json {3}"
213
214 if self.options.get("use_sudo", True):
215 cmd_templ = "sudo " + cmd_templ
koder aka kdanilov66839a92015-04-11 13:22:31 +0300216
217 params = " ".join("{0}={1}".format(k, v)
218 for k, v in self.config_params.items())
219
220 if "" != params:
221 params = "--params " + params
222
koder aka kdanilov783b4542015-04-23 18:57:04 +0300223 with self.node.connection.open_sftp() as sftp:
224 save_to_remote(sftp,
225 self.task_file, self.raw_cfg)
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300226
227 screen_name = self.test_uuid
228 cmd = cmd_templ.format(self.io_py_remote,
229 self.tool,
230 params,
231 self.task_file,
232 pid_file=self.pid_file,
233 results_file=self.log_fl,
234 screen_name=screen_name)
koder aka kdanilov66839a92015-04-11 13:22:31 +0300235 logger.debug("Waiting on barrier")
koder aka kdanilov652cd802015-04-13 12:21:07 +0300236
koder aka kdanilov0c598a12015-04-21 03:01:40 +0300237 exec_time = io_agent.calculate_execution_time(self.configs)
koder aka kdanilov652cd802015-04-13 12:21:07 +0300238 exec_time_str = sec_to_str(exec_time)
239
koder aka kdanilov2c473092015-03-29 17:12:13 +0300240 try:
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300241 timeout = int(exec_time * 2 + 300)
koder aka kdanilov652cd802015-04-13 12:21:07 +0300242 if barrier.wait():
koder aka kdanilovea22c3d2015-04-21 03:42:22 +0300243 templ = "Test should takes about {0}." + \
244 " Should finish at {1}," + \
245 " will wait at most till {2}"
246 now_dt = datetime.datetime.now()
247 end_dt = now_dt + datetime.timedelta(0, exec_time)
248 wait_till = now_dt + datetime.timedelta(0, timeout)
249
250 logger.info(templ.format(exec_time_str,
251 end_dt.strftime("%H:%M:%S"),
252 wait_till.strftime("%H:%M:%S")))
koder aka kdanilov652cd802015-04-13 12:21:07 +0300253
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300254 self.run_over_ssh(cmd)
255 logger.debug("Test started in screen {0}".format(screen_name))
256
257 end_of_wait_time = timeout + time.time()
258
259 # time_till_check = random.randint(30, 90)
260 time_till_check = 1
261
262 pid = None
263 no_pid_file = True
264 tcp_conn_timeout = 30
265 pid_get_timeout = 30 + time.time()
koder aka kdanilov783b4542015-04-23 18:57:04 +0300266 connection_ok = True
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300267
268 # TODO: add monitoring socket
269 if self.node.connection is not Local:
270 self.node.connection.close()
271
koder aka kdanilov783b4542015-04-23 18:57:04 +0300272 conn_id = self.node.get_conn_id()
273
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300274 while end_of_wait_time > time.time():
275 conn = None
276 time.sleep(time_till_check)
277
278 try:
279 if self.node.connection is not Local:
280 conn = connect(self.node.conn_url,
281 conn_timeout=tcp_conn_timeout)
282 else:
283 conn = self.node.connection
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300284
koder aka kdanilov783b4542015-04-23 18:57:04 +0300285 try:
286 with conn.open_sftp() as sftp:
287 pid = read_from_remote(sftp, self.pid_file)
288 no_pid_file = False
289 except (NameError, IOError):
290 no_pid_file = True
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300291
koder aka kdanilov783b4542015-04-23 18:57:04 +0300292 sftp.close()
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300293
koder aka kdanilov783b4542015-04-23 18:57:04 +0300294 if conn is not Local:
295 conn.close()
296
297 if no_pid_file:
298 if pid is None:
299 if time.time() > pid_get_timeout:
300 msg = ("On node {0} pid file doesn't " +
301 "appears in time")
302 logger.error(msg.format(conn_id))
303 raise RuntimeError("Start timeout")
304 else:
305 # execution finished
306 break
307 if not connection_ok:
308 msg = "Connection with {0} is restored"
309 logger.debug(msg.format(conn_id))
310 connection_ok = True
311
312 except (socket.error, SSHException) as exc:
313 if connection_ok:
314 connection_ok = False
315 msg = "Lost connection with " + conn_id
316 msg += ". Error: " + exc.message
317 logger.debug(msg)
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300318
319 logger.debug("Done")
320
321 if self.node.connection is not Local:
322 timeout = tcp_conn_timeout * 3
323 self.node.connection = connect(self.node.conn_url,
324 conn_timeout=timeout)
325
koder aka kdanilov783b4542015-04-23 18:57:04 +0300326 with self.node.connection.open_sftp() as sftp:
327 # try to reboot and then connect
328 out_err = read_from_remote(,
329 self.log_fl)
koder aka kdanilov2c473092015-03-29 17:12:13 +0300330 finally:
331 barrier.exit()
332
koder aka kdanilov652cd802015-04-13 12:21:07 +0300333 self.on_result(out_err, cmd)
334
koder aka kdanilov66839a92015-04-11 13:22:31 +0300335 def on_result(self, out_err, cmd):
336 try:
337 for data in parse_output(out_err):
338 self.on_result_cb(data)
339 except Exception as exc:
340 msg_templ = "Error during postprocessing results: {0!r}"
341 raise RuntimeError(msg_templ.format(exc.message))
342
343 def merge_results(self, results):
koder aka kdanilov4500a5f2015-04-17 16:55:17 +0300344 if len(results) == 0:
345 return None
346
koder aka kdanilov66839a92015-04-11 13:22:31 +0300347 merged_result = results[0]
348 merged_data = merged_result['res']
349 expected_keys = set(merged_data.keys())
koder aka kdanilov4e9f3ed2015-04-14 11:26:12 +0300350 mergable_fields = ['bw', 'clat', 'iops', 'lat', 'slat']
koder aka kdanilov66839a92015-04-11 13:22:31 +0300351
352 for res in results[1:]:
353 assert res['__meta__'] == merged_result['__meta__']
354
355 data = res['res']
356 diff = set(data.keys()).symmetric_difference(expected_keys)
357
358 msg = "Difference: {0}".format(",".join(diff))
359 assert len(diff) == 0, msg
360
361 for testname, test_data in data.items():
362 res_test_data = merged_data[testname]
363
364 diff = set(test_data.keys()).symmetric_difference(
365 res_test_data.keys())
366
367 msg = "Difference: {0}".format(",".join(diff))
368 assert len(diff) == 0, msg
369
370 for k, v in test_data.items():
371 if k in mergable_fields:
372 res_test_data[k].extend(v)
373 else:
374 msg = "{0!r} != {1!r}".format(res_test_data[k], v)
375 assert res_test_data[k] == v, msg
376
377 return merged_result
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +0300378
379 @classmethod
380 def format_for_console(cls, data):
381 return io_formatter.format_results_for_console(data)