blob: 4b9db19349ed49439bea39c898fce9043fb58b5e [file] [log] [blame]
koder aka kdanilov4643fd62015-02-10 16:20:13 -08001import abc
koder aka kdanilov66839a92015-04-11 13:22:31 +03002import time
koder aka kdanilov783b4542015-04-23 18:57:04 +03003import socket
koder aka kdanilov4d4771c2015-04-23 01:32:02 +03004import random
koder aka kdanilov4643fd62015-02-10 16:20:13 -08005import os.path
koder aka kdanilove21d7472015-02-14 19:02:04 -08006import logging
koder aka kdanilovea22c3d2015-04-21 03:42:22 +03007import datetime
koder aka kdanilove21d7472015-02-14 19:02:04 -08008
koder aka kdanilova855f902015-04-26 14:31:45 +03009from paramiko import SSHException, SFTPError
koder aka kdanilov783b4542015-04-23 18:57:04 +030010
koder aka kdanilove2de58c2015-04-24 22:59:36 +030011from wally.utils import (ssize_to_b, open_for_append_or_create,
12 sec_to_str, StopTestError)
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030013
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030014from wally.ssh_utils import (copy_paths, run_over_ssh,
koder aka kdanilovabd6ead2015-04-24 02:03:07 +030015 save_to_remote,
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030016 # delete_file,
17 connect, read_from_remote, Local)
18
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030019from . import postgres
20from .io import agent as io_agent
21from .io import formatter as io_formatter
22from .io.results_loader import parse_output
koder aka kdanilov652cd802015-04-13 12:21:07 +030023
koder aka kdanilov4643fd62015-02-10 16:20:13 -080024
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030025logger = logging.getLogger("wally")
koder aka kdanilove21d7472015-02-14 19:02:04 -080026
27
koder aka kdanilov4643fd62015-02-10 16:20:13 -080028class IPerfTest(object):
koder aka kdanilovabd6ead2015-04-24 02:03:07 +030029 def __init__(self, options, is_primary, on_result_cb, test_uuid, node,
koder aka kdanilov2066daf2015-04-23 21:05:41 +030030 log_directory=None,
31 coordination_queue=None,
32 remote_dir="/tmp/wally"):
koder aka kdanilovabd6ead2015-04-24 02:03:07 +030033 self.options = options
koder aka kdanilov4643fd62015-02-10 16:20:13 -080034 self.on_result_cb = on_result_cb
koder aka kdanilov4500a5f2015-04-17 16:55:17 +030035 self.log_directory = log_directory
36 self.node = node
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030037 self.test_uuid = test_uuid
koder aka kdanilovec1b9732015-04-23 20:43:29 +030038 self.coordination_queue = coordination_queue
koder aka kdanilov2066daf2015-04-23 21:05:41 +030039 self.remote_dir = remote_dir
koder aka kdanilovabd6ead2015-04-24 02:03:07 +030040 self.is_primary = is_primary
koder aka kdanilove2de58c2015-04-24 22:59:36 +030041 self.stop_requested = False
42
43 def request_stop(self):
44 self.stop_requested = True
koder aka kdanilov2066daf2015-04-23 21:05:41 +030045
46 def join_remote(self, path):
47 return os.path.join(self.remote_dir, path)
koder aka kdanilovec1b9732015-04-23 20:43:29 +030048
49 def coordinate(self, data):
50 if self.coordination_queue is not None:
koder aka kdanilove2de58c2015-04-24 22:59:36 +030051 self.coordination_queue.put((self.node.get_conn_id(), data))
koder aka kdanilov4643fd62015-02-10 16:20:13 -080052
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030053 def pre_run(self):
koder aka kdanilov4643fd62015-02-10 16:20:13 -080054 pass
55
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030056 def cleanup(self):
koder aka kdanilov4500a5f2015-04-17 16:55:17 +030057 pass
58
koder aka kdanilov4643fd62015-02-10 16:20:13 -080059 @abc.abstractmethod
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030060 def run(self, barrier):
koder aka kdanilov4643fd62015-02-10 16:20:13 -080061 pass
62
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030063 @classmethod
64 def format_for_console(cls, data):
65 msg = "{0}.format_for_console".format(cls.__name__)
66 raise NotImplementedError(msg)
67
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030068 def run_over_ssh(self, cmd, **kwargs):
69 return run_over_ssh(self.node.connection, cmd,
70 node=self.node.get_conn_id(), **kwargs)
71
koder aka kdanilovec1b9732015-04-23 20:43:29 +030072 @classmethod
73 def coordination_th(cls, coord_q, barrier, num_threads):
74 pass
75
koder aka kdanilov4643fd62015-02-10 16:20:13 -080076
Yulia Portnova7ddfa732015-02-24 17:32:58 +020077class TwoScriptTest(IPerfTest):
koder aka kdanilovabd6ead2015-04-24 02:03:07 +030078 def __init__(self, *dt, **mp):
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030079 IPerfTest.__init__(self, *dt, **mp)
Yulia Portnova7ddfa732015-02-24 17:32:58 +020080
koder aka kdanilovabd6ead2015-04-24 02:03:07 +030081 if 'run_script' in self.options:
82 self.run_script = self.options['run_script']
83 self.prepare_script = self.options['prepare_script']
Yulia Portnova7ddfa732015-02-24 17:32:58 +020084
85 def get_remote_for_script(self, script):
86 return os.path.join(self.tmp_dir, script.rpartition('/')[2])
87
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030088 def copy_script(self, src):
Yulia Portnova7ddfa732015-02-24 17:32:58 +020089 remote_path = self.get_remote_for_script(src)
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030090 copy_paths(self.node.connection, {src: remote_path})
Yulia Portnova7ddfa732015-02-24 17:32:58 +020091 return remote_path
92
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030093 def pre_run(self):
94 remote_script = self.copy_script(self.node.connection,
95 self.pre_run_script)
Yulia Portnova7ddfa732015-02-24 17:32:58 +020096 cmd = remote_script
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030097 self.run_over_ssh(cmd)
Yulia Portnova7ddfa732015-02-24 17:32:58 +020098
koder aka kdanilov4d4771c2015-04-23 01:32:02 +030099 def run(self, barrier):
100 remote_script = self.copy_script(self.node.connection, self.run_script)
Yulia Portnova886a2562015-04-07 11:16:13 +0300101 cmd_opts = ' '.join(["%s %s" % (key, val) for key, val
koder aka kdanilovabd6ead2015-04-24 02:03:07 +0300102 in self.options.items()])
Yulia Portnova886a2562015-04-07 11:16:13 +0300103 cmd = remote_script + ' ' + cmd_opts
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300104 out_err = self.run_over_ssh(cmd)
koder aka kdanilov66839a92015-04-11 13:22:31 +0300105 self.on_result(out_err, cmd)
Yulia Portnova7ddfa732015-02-24 17:32:58 +0200106
107 def parse_results(self, out):
108 for line in out.split("\n"):
109 key, separator, value = line.partition(":")
110 if key and value:
111 self.on_result_cb((key, float(value)))
112
koder aka kdanilov66839a92015-04-11 13:22:31 +0300113 def on_result(self, out_err, cmd):
114 try:
115 self.parse_results(out_err)
116 except Exception as exc:
koder aka kdanilovec1b9732015-04-23 20:43:29 +0300117 msg_templ = "Error during postprocessing results: {0!s}. {1}"
118 raise RuntimeError(msg_templ.format(exc, out_err))
Yulia Portnova7ddfa732015-02-24 17:32:58 +0200119
120
121class PgBenchTest(TwoScriptTest):
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +0300122 root = os.path.dirname(postgres.__file__)
123 prepare_script = os.path.join(root, "prepare.sh")
124 run_script = os.path.join(root, "run.sh")
koder aka kdanilov4500a5f2015-04-17 16:55:17 +0300125
126
koder aka kdanilov4643fd62015-02-10 16:20:13 -0800127class IOPerfTest(IPerfTest):
koder aka kdanilov46d4f392015-04-24 11:35:00 +0300128 tcp_conn_timeout = 30
koder aka kdanilov57ce4db2015-04-25 21:25:51 +0300129 max_pig_timeout = 5
130 soft_runcycle = 5 * 60
koder aka kdanilov2c473092015-03-29 17:12:13 +0300131
koder aka kdanilovabd6ead2015-04-24 02:03:07 +0300132 def __init__(self, *dt, **mp):
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300133 IPerfTest.__init__(self, *dt, **mp)
koder aka kdanilovabd6ead2015-04-24 02:03:07 +0300134 self.config_fname = self.options['cfg']
135 self.alive_check_interval = self.options.get('alive_check_interval')
136 self.config_params = self.options.get('params', {})
137 self.tool = self.options.get('tool', 'fio')
koder aka kdanilovda45e882015-04-06 02:24:42 +0300138 self.raw_cfg = open(self.config_fname).read()
koder aka kdanilov0c598a12015-04-21 03:01:40 +0300139 self.configs = list(io_agent.parse_all_in_1(self.raw_cfg,
140 self.config_params))
koder aka kdanilov4643fd62015-02-10 16:20:13 -0800141
koder aka kdanilov4500a5f2015-04-17 16:55:17 +0300142 cmd_log = os.path.join(self.log_directory, "task_compiled.cfg")
143 raw_res = os.path.join(self.log_directory, "raw_results.txt")
koder aka kdanilovda45e882015-04-06 02:24:42 +0300144
koder aka kdanilov2066daf2015-04-23 21:05:41 +0300145 self.io_py_remote = self.join_remote("agent.py")
146 self.log_fl = self.join_remote("log.txt")
147 self.pid_file = self.join_remote("pid")
148 self.task_file = self.join_remote("task.cfg")
koder aka kdanilove2de58c2015-04-24 22:59:36 +0300149 self.use_sudo = self.options.get("use_sudo", True)
koder aka kdanilov57ce4db2015-04-25 21:25:51 +0300150 self.test_logging = self.options.get("test_logging", False)
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300151
koder aka kdanilov4500a5f2015-04-17 16:55:17 +0300152 fio_command_file = open_for_append_or_create(cmd_log)
koder aka kdanilov0c598a12015-04-21 03:01:40 +0300153
koder aka kdanilov57ce4db2015-04-25 21:25:51 +0300154 if self.test_logging:
155 soft_runcycle = self.soft_runcycle
156 else:
157 soft_runcycle = None
158
159 self.fio_configs = io_agent.parse_and_slice_all_in_1(
160 self.raw_cfg,
161 self.config_params,
162 soft_runcycle=soft_runcycle)
163
164 self.fio_configs = list(self.fio_configs)
koder aka kdanilov0c598a12015-04-21 03:01:40 +0300165 splitter = "\n\n" + "-" * 60 + "\n\n"
koder aka kdanilov57ce4db2015-04-25 21:25:51 +0300166
167 cfg = splitter.join(
168 map(io_agent.fio_config_to_str,
169 self.fio_configs))
170
171 fio_command_file.write(cfg)
koder aka kdanilov4500a5f2015-04-17 16:55:17 +0300172 self.fio_raw_results_file = open_for_append_or_create(raw_res)
173
koder aka kdanilove2de58c2015-04-24 22:59:36 +0300174 def __str__(self):
175 return "{0}({1})".format(self.__class__.__name__,
176 self.node.get_conn_id())
177
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300178 def cleanup(self):
179 # delete_file(conn, self.io_py_remote)
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +0300180 # Need to remove tempo files, used for testing
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300181 pass
koder aka kdanilov4500a5f2015-04-17 16:55:17 +0300182
koder aka kdanilov2066daf2015-04-23 21:05:41 +0300183 def prefill_test_files(self):
184 files = {}
185
186 for section in self.configs:
187 sz = ssize_to_b(section.vals['size'])
188 msz = sz / (1024 ** 2)
189
190 if sz % (1024 ** 2) != 0:
191 msz += 1
192
193 fname = section.vals['filename']
194
195 # if already has other test with the same file name
196 # take largest size
197 files[fname] = max(files.get(fname, 0), msz)
198
199 cmd_templ = "dd oflag=direct " + \
200 "if=/dev/zero of={0} bs={1} count={2}"
201
koder aka kdanilove2de58c2015-04-24 22:59:36 +0300202 if self.use_sudo:
koder aka kdanilov2066daf2015-04-23 21:05:41 +0300203 cmd_templ = "sudo " + cmd_templ
204
205 ssize = 0
206 stime = time.time()
207
208 for fname, curr_sz in files.items():
209 cmd = cmd_templ.format(fname, 1024 ** 2, curr_sz)
210 ssize += curr_sz
211 self.run_over_ssh(cmd, timeout=curr_sz)
212
213 ddtime = time.time() - stime
214 if ddtime > 1E-3:
215 fill_bw = int(ssize / ddtime)
216 mess = "Initiall dd fill bw is {0} MiBps for this vm"
217 logger.info(mess.format(fill_bw))
koder aka kdanilovabd6ead2015-04-24 02:03:07 +0300218 self.coordinate(('init_bw', fill_bw))
koder aka kdanilov2066daf2015-04-23 21:05:41 +0300219
220 def install_utils(self, max_retry=3, timeout=5):
221 need_install = []
222 for bin_name, package in (('fio', 'fio'), ('screen', 'screen')):
223 try:
224 self.run_over_ssh('which ' + bin_name, nolog=True)
225 except OSError:
226 need_install.append(package)
227
koder aka kdanilovafd98742015-04-24 01:27:22 +0300228 if len(need_install) == 0:
229 return
230
koder aka kdanilov2066daf2015-04-23 21:05:41 +0300231 cmd = "sudo apt-get -y install " + " ".join(need_install)
232
233 for i in range(max_retry):
234 try:
235 self.run_over_ssh(cmd)
236 break
237 except OSError as err:
238 time.sleep(timeout)
239 else:
240 raise OSError("Can't install - " + str(err))
241
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300242 def pre_run(self):
koder aka kdanilova4a570f2015-04-23 22:11:40 +0300243 try:
244 cmd = 'mkdir -p "{0}"'.format(self.remote_dir)
koder aka kdanilove2de58c2015-04-24 22:59:36 +0300245 if self.use_sudo:
koder aka kdanilova4a570f2015-04-23 22:11:40 +0300246 cmd = "sudo " + cmd
247 cmd += " ; sudo chown {0} {1}".format(self.node.get_user(),
248 self.remote_dir)
249
250 self.run_over_ssh(cmd)
251 except Exception as exc:
252 msg = "Failed to create folder {0} on remote {1}. Error: {2!s}"
253 msg = msg.format(self.remote_dir, self.node.get_conn_id(), exc)
254 logger.error(msg)
koder aka kdanilove2de58c2015-04-24 22:59:36 +0300255 raise StopTestError(msg, exc)
koder aka kdanilov783b4542015-04-23 18:57:04 +0300256
koder aka kdanilov2066daf2015-04-23 21:05:41 +0300257 self.install_utils()
koder aka kdanilovda45e882015-04-06 02:24:42 +0300258
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300259 local_fname = os.path.splitext(io_agent.__file__)[0] + ".py"
koder aka kdanilov2066daf2015-04-23 21:05:41 +0300260 files_to_copy = {local_fname: self.io_py_remote}
261 copy_paths(self.node.connection, files_to_copy)
koder aka kdanilov4643fd62015-02-10 16:20:13 -0800262
koder aka kdanilove87ae652015-04-20 02:14:35 +0300263 if self.options.get('prefill_files', True):
koder aka kdanilov2066daf2015-04-23 21:05:41 +0300264 self.prefill_test_files()
koder aka kdanilovabd6ead2015-04-24 02:03:07 +0300265 elif self.is_primary:
koder aka kdanilov2066daf2015-04-23 21:05:41 +0300266 logger.warning("Prefilling of test files is disabled")
koder aka kdanilov6e2ae792015-03-04 18:02:24 -0800267
koder aka kdanilove2de58c2015-04-24 22:59:36 +0300268 def check_process_is_running(self, sftp, pid):
269 try:
270 sftp.stat("/proc/{0}".format(pid))
271 return True
koder aka kdanilova855f902015-04-26 14:31:45 +0300272 except (OSError, IOError, NameError):
koder aka kdanilove2de58c2015-04-24 22:59:36 +0300273 return False
274
275 def kill_remote_process(self, conn, pid, soft=True):
276 try:
277 if soft:
278 cmd = "kill {0}"
279 else:
280 cmd = "kill -9 {0}"
281
282 if self.use_sudo:
283 cmd = "sudo " + cmd
284
285 self.run_over_ssh(cmd.format(pid))
286 return True
287 except OSError:
288 return False
289
290 def get_test_status(self, die_timeout=3):
koder aka kdanilov46d4f392015-04-24 11:35:00 +0300291 is_connected = None
koder aka kdanilove2de58c2015-04-24 22:59:36 +0300292 is_running = None
koder aka kdanilov46d4f392015-04-24 11:35:00 +0300293 pid = None
294 err = None
295
296 try:
297 conn = connect(self.node.conn_url,
298 conn_timeout=self.tcp_conn_timeout)
299 with conn:
300 with conn.open_sftp() as sftp:
301 try:
302 pid = read_from_remote(sftp, self.pid_file)
koder aka kdanilove2de58c2015-04-24 22:59:36 +0300303 is_running = True
koder aka kdanilov46d4f392015-04-24 11:35:00 +0300304 except (NameError, IOError) as exc:
305 pid = None
koder aka kdanilove2de58c2015-04-24 22:59:36 +0300306 is_running = False
307
308 if is_running:
309 if not self.check_process_is_running(sftp, pid):
310 sftp.remove(self.pid_file)
311 is_running = False
koder aka kdanilov46d4f392015-04-24 11:35:00 +0300312
313 is_connected = True
314
koder aka kdanilova855f902015-04-26 14:31:45 +0300315 except (socket.error, SSHException, EOFError, SFTPError) as exc:
koder aka kdanilov46d4f392015-04-24 11:35:00 +0300316 err = str(exc)
317 is_connected = False
318
koder aka kdanilove2de58c2015-04-24 22:59:36 +0300319 return is_connected, is_running, pid, err
koder aka kdanilov46d4f392015-04-24 11:35:00 +0300320
koder aka kdanilova855f902015-04-26 14:31:45 +0300321 def wait_till_finished(self, soft_timeout, timeout):
koder aka kdanilov46d4f392015-04-24 11:35:00 +0300322 conn_id = self.node.get_conn_id()
323 end_of_wait_time = timeout + time.time()
koder aka kdanilova855f902015-04-26 14:31:45 +0300324 soft_end_of_wait_time = soft_timeout + time.time()
koder aka kdanilov46d4f392015-04-24 11:35:00 +0300325
326 # time_till_check = random.randint(30, 90)
327 time_till_check = 5
328 pid = None
koder aka kdanilove2de58c2015-04-24 22:59:36 +0300329 is_running = False
koder aka kdanilov46d4f392015-04-24 11:35:00 +0300330 pid_get_timeout = self.max_pig_timeout + time.time()
331 curr_connected = True
332
333 while end_of_wait_time > time.time():
334 time.sleep(time_till_check)
335
koder aka kdanilov57ce4db2015-04-25 21:25:51 +0300336 is_connected, is_running, npid, err = self.get_test_status()
koder aka kdanilov46d4f392015-04-24 11:35:00 +0300337
koder aka kdanilov57ce4db2015-04-25 21:25:51 +0300338 if is_connected and not is_running:
339 if pid is None:
340 if time.time() > pid_get_timeout:
341 msg = ("On node {0} pid file doesn't " +
342 "appears in time")
343 logger.error(msg.format(conn_id))
344 raise StopTestError("Start timeout")
koder aka kdanilov46d4f392015-04-24 11:35:00 +0300345 else:
346 # execution finished
347 break
348
koder aka kdanilov57ce4db2015-04-25 21:25:51 +0300349 if npid is not None:
350 pid = npid
351
koder aka kdanilova855f902015-04-26 14:31:45 +0300352 if is_connected and pid is not None and is_running:
353 if time.time() < soft_end_of_wait_time:
354 time.sleep(soft_end_of_wait_time - time.time())
355
koder aka kdanilov46d4f392015-04-24 11:35:00 +0300356 if is_connected and not curr_connected:
357 msg = "Connection with {0} is restored"
358 logger.debug(msg.format(conn_id))
359 elif not is_connected and curr_connected:
360 msg = "Lost connection with " + conn_id + ". Error: " + err
361 logger.debug(msg)
362
363 curr_connected = is_connected
364
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300365 def run(self, barrier):
koder aka kdanilov57ce4db2015-04-25 21:25:51 +0300366 try:
koder aka kdanilova323b302015-04-26 00:40:22 +0300367 if len(self.fio_configs) > 1 and self.is_primary:
koder aka kdanilov57ce4db2015-04-25 21:25:51 +0300368
369 exec_time = 0
370 for test in self.fio_configs:
371 exec_time += io_agent.calculate_execution_time(test)
372
373 exec_time_s = sec_to_str(exec_time)
koder aka kdanilova855f902015-04-26 14:31:45 +0300374 now_dt = datetime.datetime.now()
375 end_dt = now_dt + datetime.timedelta(0, exec_time)
376 msg = "Entire test should takes aroud: {0} and finished at {1}"
377 logger.info(msg.format(exec_time_s,
378 end_dt.strftime("%H:%M:%S")))
koder aka kdanilov57ce4db2015-04-25 21:25:51 +0300379
380 for pos, fio_cfg_slice in enumerate(self.fio_configs):
381 names = [i.name for i in fio_cfg_slice]
382 msgs = []
383 already_processed = set()
384 for name in names:
385 if name not in already_processed:
386 already_processed.add(name)
387
388 if 1 == names.count(name):
389 msgs.append(name)
390 else:
391 frmt = "{0} * {1}"
392 msgs.append(frmt.format(name,
393 names.count(name)))
394
koder aka kdanilova323b302015-04-26 00:40:22 +0300395 if self.is_primary:
396 logger.info("Will run tests: " + ", ".join(msgs))
koder aka kdanilov57ce4db2015-04-25 21:25:51 +0300397
koder aka kdanilova323b302015-04-26 00:40:22 +0300398 nolog = (pos != 0) or not self.is_primary
399 out_err = self.do_run(barrier, fio_cfg_slice, nolog=nolog)
koder aka kdanilov57ce4db2015-04-25 21:25:51 +0300400
401 try:
402 for data in parse_output(out_err):
403 data['__meta__']['raw_cfg'] = self.raw_cfg
404 self.on_result_cb(data)
405 except (OSError, StopTestError):
406 raise
407 except Exception as exc:
408 msg_templ = "Error during postprocessing results: {0!s}"
409 raise RuntimeError(msg_templ.format(exc))
410
411 finally:
412 barrier.exit()
413
414 def do_run(self, barrier, cfg, nolog=False):
koder aka kdanilovabd6ead2015-04-24 02:03:07 +0300415 conn_id = self.node.get_conn_id()
416
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300417 cmd_templ = "screen -S {screen_name} -d -m " + \
418 "env python2 {0} -p {pid_file} -o {results_file} " + \
419 "--type {1} {2} --json {3}"
420
421 if self.options.get("use_sudo", True):
422 cmd_templ = "sudo " + cmd_templ
koder aka kdanilov66839a92015-04-11 13:22:31 +0300423
424 params = " ".join("{0}={1}".format(k, v)
425 for k, v in self.config_params.items())
426
427 if "" != params:
428 params = "--params " + params
429
koder aka kdanilov783b4542015-04-23 18:57:04 +0300430 with self.node.connection.open_sftp() as sftp:
koder aka kdanilov57ce4db2015-04-25 21:25:51 +0300431 save_to_remote(sftp, self.task_file,
432 io_agent.fio_config_to_str(cfg))
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300433
434 screen_name = self.test_uuid
435 cmd = cmd_templ.format(self.io_py_remote,
436 self.tool,
437 params,
438 self.task_file,
439 pid_file=self.pid_file,
440 results_file=self.log_fl,
441 screen_name=screen_name)
koder aka kdanilov57ce4db2015-04-25 21:25:51 +0300442
443 exec_time = io_agent.calculate_execution_time(cfg)
koder aka kdanilov652cd802015-04-13 12:21:07 +0300444 exec_time_str = sec_to_str(exec_time)
445
koder aka kdanilova855f902015-04-26 14:31:45 +0300446 timeout = int(exec_time + max(300, exec_time))
koder aka kdanilov57ce4db2015-04-25 21:25:51 +0300447 barrier.wait()
koder aka kdanilova323b302015-04-26 00:40:22 +0300448 self.run_over_ssh(cmd, nolog=nolog)
koder aka kdanilovabd6ead2015-04-24 02:03:07 +0300449
koder aka kdanilov57ce4db2015-04-25 21:25:51 +0300450 if self.is_primary:
451 templ = "Test should takes about {0}." + \
452 " Should finish at {1}," + \
453 " will wait at most till {2}"
454 now_dt = datetime.datetime.now()
455 end_dt = now_dt + datetime.timedelta(0, exec_time)
456 wait_till = now_dt + datetime.timedelta(0, timeout)
koder aka kdanilovea22c3d2015-04-21 03:42:22 +0300457
koder aka kdanilov57ce4db2015-04-25 21:25:51 +0300458 logger.info(templ.format(exec_time_str,
459 end_dt.strftime("%H:%M:%S"),
460 wait_till.strftime("%H:%M:%S")))
koder aka kdanilov652cd802015-04-13 12:21:07 +0300461
koder aka kdanilov57ce4db2015-04-25 21:25:51 +0300462 if not nolog:
463 msg = "Tests started in screen {1} on each testnode"
464 logger.debug(msg.format(conn_id, screen_name))
koder aka kdanilov46d4f392015-04-24 11:35:00 +0300465
koder aka kdanilov57ce4db2015-04-25 21:25:51 +0300466 # TODO: add monitoring socket
467 if self.node.connection is not Local:
468 self.node.connection.close()
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300469
koder aka kdanilov57ce4db2015-04-25 21:25:51 +0300470 self.wait_till_finished(timeout)
471 if not nolog:
472 logger.debug("Test on node {0} is finished".format(conn_id))
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300473
koder aka kdanilov57ce4db2015-04-25 21:25:51 +0300474 if self.node.connection is not Local:
475 conn_timeout = self.tcp_conn_timeout * 3
476 self.node.connection = connect(self.node.conn_url,
477 conn_timeout=conn_timeout)
koder aka kdanilov4d4771c2015-04-23 01:32:02 +0300478
koder aka kdanilov57ce4db2015-04-25 21:25:51 +0300479 with self.node.connection.open_sftp() as sftp:
480 return read_from_remote(sftp, self.log_fl)
koder aka kdanilov66839a92015-04-11 13:22:31 +0300481
482 def merge_results(self, results):
koder aka kdanilov4500a5f2015-04-17 16:55:17 +0300483 if len(results) == 0:
484 return None
485
koder aka kdanilov66839a92015-04-11 13:22:31 +0300486 merged_result = results[0]
487 merged_data = merged_result['res']
koder aka kdanilov4e9f3ed2015-04-14 11:26:12 +0300488 mergable_fields = ['bw', 'clat', 'iops', 'lat', 'slat']
koder aka kdanilov66839a92015-04-11 13:22:31 +0300489
490 for res in results[1:]:
491 assert res['__meta__'] == merged_result['__meta__']
koder aka kdanilov66839a92015-04-11 13:22:31 +0300492 data = res['res']
koder aka kdanilov66839a92015-04-11 13:22:31 +0300493
494 for testname, test_data in data.items():
koder aka kdanilov57ce4db2015-04-25 21:25:51 +0300495 if testname not in merged_data:
496 merged_data[testname] = test_data
497 continue
498
koder aka kdanilov66839a92015-04-11 13:22:31 +0300499 res_test_data = merged_data[testname]
500
501 diff = set(test_data.keys()).symmetric_difference(
502 res_test_data.keys())
503
504 msg = "Difference: {0}".format(",".join(diff))
505 assert len(diff) == 0, msg
506
507 for k, v in test_data.items():
508 if k in mergable_fields:
509 res_test_data[k].extend(v)
510 else:
511 msg = "{0!r} != {1!r}".format(res_test_data[k], v)
512 assert res_test_data[k] == v, msg
513
514 return merged_result
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +0300515
516 @classmethod
517 def format_for_console(cls, data):
518 return io_formatter.format_results_for_console(data)