blob: 8e82caee338c931247c198b3663d932c5cc6245a [file] [log] [blame]
koder aka kdanilove573c9c2015-06-02 08:57:48 +03001import re
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +03002import time
3import json
4import os.path
5import logging
6import datetime
7
koder aka kdanilovbb5fe072015-05-21 02:50:23 +03008import paramiko
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +03009
koder aka kdanilovbb5fe072015-05-21 02:50:23 +030010from wally.utils import (ssize2b, sec_to_str, StopTestError)
11
12from wally.ssh_utils import (save_to_remote, read_from_remote, BGSSHTask,
13 reconnect)
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030014
15from ..itest import IPerfTest, TestResults
16from .formatter import format_results_for_console
17from .fio_task_parser import (execution_time, fio_cfg_compile,
18 get_test_summary, FioJobSection)
19
20
21logger = logging.getLogger("wally")
22
23
24class IOTestResults(TestResults):
25 def summary(self):
26 return get_test_summary(self.config) + "vm" + str(self.vm_count)
27
28 def get_yamable(self):
29 return {
30 'type': "fio_test",
31 'params': self.params,
32 'config': (self.config.name, self.config.vals),
33 'results': self.results,
34 'raw_result': self.raw_result,
35 'run_interval': self.run_interval,
koder aka kdanilovbb5fe072015-05-21 02:50:23 +030036 'vm_count': self.vm_count,
koder aka kdanilov88407ff2015-05-26 15:35:57 +030037 'test_name': self.test_name,
38 'files': self.files
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030039 }
40
41 @classmethod
42 def from_yaml(cls, data):
43 name, vals = data['config']
44 sec = FioJobSection(name)
45 sec.vals = vals
46
47 return cls(sec, data['params'], data['results'],
48 data['raw_result'], data['run_interval'],
koder aka kdanilov88407ff2015-05-26 15:35:57 +030049 data['vm_count'], data['test_name'],
50 files=data.get('files', {}))
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030051
52
53def get_slice_parts_offset(test_slice, real_inteval):
54 calc_exec_time = sum(map(execution_time, test_slice))
55 coef = (real_inteval[1] - real_inteval[0]) / calc_exec_time
56 curr_offset = real_inteval[0]
57 for section in test_slice:
58 slen = execution_time(section) * coef
59 yield (curr_offset, curr_offset + slen)
60 curr_offset += slen
61
62
63class IOPerfTest(IPerfTest):
64 tcp_conn_timeout = 30
65 max_pig_timeout = 5
66 soft_runcycle = 5 * 60
67
68 def __init__(self, *dt, **mp):
69 IPerfTest.__init__(self, *dt, **mp)
70 self.config_fname = self.options['cfg']
71
72 if '/' not in self.config_fname and '.' not in self.config_fname:
73 cfgs_dir = os.path.dirname(__file__)
74 self.config_fname = os.path.join(cfgs_dir,
75 self.config_fname + '.cfg')
76
77 self.alive_check_interval = self.options.get('alive_check_interval')
78
79 self.config_params = self.options.get('params', {}).copy()
80 self.tool = self.options.get('tool', 'fio')
81
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030082 self.io_py_remote = self.join_remote("agent.py")
83 self.results_file = self.join_remote("results.json")
84 self.pid_file = self.join_remote("pid")
85 self.task_file = self.join_remote("task.cfg")
koder aka kdanilovbb5fe072015-05-21 02:50:23 +030086 self.sh_file = self.join_remote("cmd.sh")
87 self.err_out_file = self.join_remote("fio_err_out")
88 self.exit_code_file = self.join_remote("exit_code")
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030089 self.use_sudo = self.options.get("use_sudo", True)
90 self.test_logging = self.options.get("test_logging", False)
91 self.raw_cfg = open(self.config_fname).read()
92 self.fio_configs = fio_cfg_compile(self.raw_cfg,
93 self.config_fname,
94 self.config_params,
95 split_on_names=self.test_logging)
96 self.fio_configs = list(self.fio_configs)
97
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030098 def __str__(self):
99 return "{0}({1})".format(self.__class__.__name__,
100 self.node.get_conn_id())
101
102 @classmethod
103 def load(cls, data):
104 return IOTestResults.from_yaml(data)
105
106 def cleanup(self):
107 # delete_file(conn, self.io_py_remote)
108 # Need to remove tempo files, used for testing
109 pass
110
111 def prefill_test_files(self):
112 files = {}
113 for cfg_slice in self.fio_configs:
114 for section in cfg_slice:
115 sz = ssize2b(section.vals['size'])
116 msz = sz / (1024 ** 2)
117
118 if sz % (1024 ** 2) != 0:
119 msz += 1
120
121 fname = section.vals['filename']
122
123 # if already has other test with the same file name
124 # take largest size
125 files[fname] = max(files.get(fname, 0), msz)
126
koder aka kdanilov88407ff2015-05-26 15:35:57 +0300127 cmd_templ = "fio --name=xxx --filename={0} --direct=1" + \
128 " --bs=4m --size={1}m --rw=write"
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300129
130 if self.use_sudo:
131 cmd_templ = "sudo " + cmd_templ
132
133 ssize = 0
134 stime = time.time()
135
136 for fname, curr_sz in files.items():
koder aka kdanilov88407ff2015-05-26 15:35:57 +0300137 cmd = cmd_templ.format(fname, curr_sz)
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300138 ssize += curr_sz
139 self.run_over_ssh(cmd, timeout=curr_sz)
140
141 ddtime = time.time() - stime
142 if ddtime > 1E-3:
143 fill_bw = int(ssize / ddtime)
144 mess = "Initiall dd fill bw is {0} MiBps for this vm"
145 logger.info(mess.format(fill_bw))
146 self.coordinate(('init_bw', fill_bw))
147
148 def install_utils(self, max_retry=3, timeout=5):
149 need_install = []
150 for bin_name, package in (('fio', 'fio'), ('screen', 'screen')):
151 try:
152 self.run_over_ssh('which ' + bin_name, nolog=True)
153 except OSError:
154 need_install.append(package)
155
156 if len(need_install) == 0:
157 return
158
159 cmd = "sudo apt-get -y install " + " ".join(need_install)
160
koder aka kdanilove573c9c2015-06-02 08:57:48 +0300161 for _ in range(max_retry):
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300162 try:
163 self.run_over_ssh(cmd)
164 break
165 except OSError as err:
166 time.sleep(timeout)
167 else:
168 raise OSError("Can't install - " + str(err))
169
170 def pre_run(self):
171 try:
172 cmd = 'mkdir -p "{0}"'.format(self.remote_dir)
173 if self.use_sudo:
174 cmd = "sudo " + cmd
175 cmd += " ; sudo chown {0} {1}".format(self.node.get_user(),
176 self.remote_dir)
177
178 self.run_over_ssh(cmd)
179 except Exception as exc:
180 msg = "Failed to create folder {0} on remote {1}. Error: {2!s}"
181 msg = msg.format(self.remote_dir, self.node.get_conn_id(), exc)
182 logger.exception(msg)
183 raise StopTestError(msg, exc)
184
185 self.install_utils()
186
187 if self.options.get('prefill_files', True):
188 self.prefill_test_files()
189 elif self.is_primary:
190 logger.warning("Prefilling of test files is disabled")
191
192 def run(self, barrier):
193 try:
194 if len(self.fio_configs) > 1 and self.is_primary:
195
196 exec_time = 0
197 for test_slice in self.fio_configs:
198 exec_time += sum(map(execution_time, test_slice))
199
200 # +10% - is a rough estimation for additional operations
201 # like sftp, etc
202 exec_time = int(exec_time * 1.1)
203
204 exec_time_s = sec_to_str(exec_time)
205 now_dt = datetime.datetime.now()
206 end_dt = now_dt + datetime.timedelta(0, exec_time)
207 msg = "Entire test should takes aroud: {0} and finished at {1}"
208 logger.info(msg.format(exec_time_s,
209 end_dt.strftime("%H:%M:%S")))
210
211 for pos, fio_cfg_slice in enumerate(self.fio_configs):
212 fio_cfg_slice = list(fio_cfg_slice)
213 names = [i.name for i in fio_cfg_slice]
214 msgs = []
215 already_processed = set()
216 for name in names:
217 if name not in already_processed:
218 already_processed.add(name)
219
220 if 1 == names.count(name):
221 msgs.append(name)
222 else:
223 frmt = "{0} * {1}"
224 msgs.append(frmt.format(name,
225 names.count(name)))
226
227 if self.is_primary:
228 logger.info("Will run tests: " + ", ".join(msgs))
229
230 nolog = (pos != 0) or not self.is_primary
koder aka kdanilov88407ff2015-05-26 15:35:57 +0300231
232 max_retr = 3 if self.total_nodes_count == 1 else 1
233
234 for idx in range(max_retr):
235 try:
236 out_err, interval, files = self.do_run(barrier, fio_cfg_slice, pos,
237 nolog=nolog)
238 break
239 except Exception as exc:
240 logger.exception("During fio run")
241 if idx == max_retr - 1:
242 raise StopTestError("Fio failed", exc)
243 logger.info("Sleeping 30s and retrying")
244 time.sleep(30)
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300245
246 try:
koder aka kdanilov88407ff2015-05-26 15:35:57 +0300247 # HACK
248 out_err = "{" + out_err.split("{", 1)[1]
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300249 full_raw_res = json.loads(out_err)
250
koder aka kdanilove573c9c2015-06-02 08:57:48 +0300251 res = {"bw": [],
252 "iops": [],
253 "lat": [],
254 "clat": [],
255 "slat": []}
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300256
257 for raw_result in full_raw_res['jobs']:
258 load_data = raw_result['mixed']
259
260 res["bw"].append(load_data["bw"])
261 res["iops"].append(load_data["iops"])
262 res["lat"].append(load_data["lat"]["mean"])
263 res["clat"].append(load_data["clat"]["mean"])
264 res["slat"].append(load_data["slat"]["mean"])
265
266 first = fio_cfg_slice[0]
267 p1 = first.vals.copy()
268 p1.pop('ramp_time', 0)
koder aka kdanilov88407ff2015-05-26 15:35:57 +0300269 p1.pop('offset', 0)
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300270
271 for nxt in fio_cfg_slice[1:]:
272 assert nxt.name == first.name
273 p2 = nxt.vals
274 p2.pop('_ramp_time', 0)
koder aka kdanilov88407ff2015-05-26 15:35:57 +0300275 p2.pop('offset', 0)
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300276 assert p1 == p2
277
koder aka kdanilov88407ff2015-05-26 15:35:57 +0300278 tname = os.path.basename(self.config_fname)
279 if tname.endswith('.cfg'):
280 tname = tname[:-4]
281
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300282 tres = IOTestResults(first,
283 self.config_params, res,
284 full_raw_res, interval,
koder aka kdanilov88407ff2015-05-26 15:35:57 +0300285 test_name=tname,
286 vm_count=self.total_nodes_count,
287 files=files)
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300288 self.on_result_cb(tres)
koder aka kdanilov88407ff2015-05-26 15:35:57 +0300289 except StopTestError:
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300290 raise
291 except Exception as exc:
koder aka kdanilov88407ff2015-05-26 15:35:57 +0300292 msg_templ = "Error during postprocessing results"
293 logger.exception(msg_templ)
294 raise StopTestError(msg_templ.format(exc), exc)
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300295
296 finally:
297 barrier.exit()
298
koder aka kdanilovfd2cfa52015-05-20 03:17:42 +0300299 def do_run(self, barrier, cfg_slice, pos, nolog=False):
koder aka kdanilove573c9c2015-06-02 08:57:48 +0300300 exec_folder = os.path.dirname(self.task_file)
koder aka kdanilovbb5fe072015-05-21 02:50:23 +0300301 bash_file = "#!/bin/bash\n" + \
koder aka kdanilove573c9c2015-06-02 08:57:48 +0300302 "cd {exec_folder}\n" + \
koder aka kdanilovbb5fe072015-05-21 02:50:23 +0300303 "fio --output-format=json --output={out_file} " + \
304 "--alloc-size=262144 {job_file} " + \
305 " >{err_out_file} 2>&1 \n" + \
306 "echo $? >{res_code_file}\n"
307
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300308 conn_id = self.node.get_conn_id()
koder aka kdanilovfd2cfa52015-05-20 03:17:42 +0300309 fconn_id = conn_id.replace(":", "_")
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300310
koder aka kdanilovbb5fe072015-05-21 02:50:23 +0300311 bash_file = bash_file.format(out_file=self.results_file,
312 job_file=self.task_file,
313 err_out_file=self.err_out_file,
koder aka kdanilove573c9c2015-06-02 08:57:48 +0300314 res_code_file=self.exit_code_file,
315 exec_folder=exec_folder)
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300316
317 task_fc = "\n\n".join(map(str, cfg_slice))
318 with self.node.connection.open_sftp() as sftp:
319 save_to_remote(sftp, self.task_file, task_fc)
koder aka kdanilovbb5fe072015-05-21 02:50:23 +0300320 save_to_remote(sftp, self.sh_file, bash_file)
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300321
koder aka kdanilovfd2cfa52015-05-20 03:17:42 +0300322 fname = "{0}_{1}.fio".format(pos, fconn_id)
323 with open(os.path.join(self.log_directory, fname), "w") as fd:
324 fd.write(task_fc)
325
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300326 exec_time = sum(map(execution_time, cfg_slice))
327 exec_time_str = sec_to_str(exec_time)
328
329 timeout = int(exec_time + max(300, exec_time))
330 soft_tout = exec_time
koder aka kdanilovbb5fe072015-05-21 02:50:23 +0300331
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300332 barrier.wait()
333
334 if self.is_primary:
335 templ = "Test should takes about {0}." + \
336 " Should finish at {1}," + \
337 " will wait at most till {2}"
338 now_dt = datetime.datetime.now()
339 end_dt = now_dt + datetime.timedelta(0, exec_time)
340 wait_till = now_dt + datetime.timedelta(0, timeout)
341
342 logger.info(templ.format(exec_time_str,
343 end_dt.strftime("%H:%M:%S"),
344 wait_till.strftime("%H:%M:%S")))
345
346 task = BGSSHTask(self.node, self.options.get("use_sudo", True))
347 begin = time.time()
koder aka kdanilovbb5fe072015-05-21 02:50:23 +0300348
349 if self.options.get("use_sudo", True):
350 sudo = "sudo "
351 else:
352 sudo = ""
353
koder aka kdanilove573c9c2015-06-02 08:57:48 +0300354 fnames_before = self.run_over_ssh("ls -1 " + exec_folder, nolog=True)
koder aka kdanilovbb5fe072015-05-21 02:50:23 +0300355 task.start(sudo + "bash " + self.sh_file)
356
357 while True:
358 try:
359 task.wait(soft_tout, timeout)
360 break
361 except paramiko.SSHException:
362 pass
363
364 try:
365 self.node.connection.close()
366 except:
367 pass
368
369 reconnect(self.node.connection, self.node.conn_url)
370
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300371 end = time.time()
koder aka kdanilove573c9c2015-06-02 08:57:48 +0300372 fnames_after = self.run_over_ssh("ls -1 " + exec_folder, nolog=True)
373
374 new_files = set(fnames_after.split()) - set(fnames_before.split())
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300375
376 if not nolog:
377 logger.debug("Test on node {0} is finished".format(conn_id))
378
koder aka kdanilove573c9c2015-06-02 08:57:48 +0300379 log_files_re = set()
koder aka kdanilovfd2cfa52015-05-20 03:17:42 +0300380 for cfg in cfg_slice:
381 if 'write_lat_log' in cfg.vals:
382 fname = cfg.vals['write_lat_log']
koder aka kdanilove573c9c2015-06-02 08:57:48 +0300383 log_files_re.add(fname + '_clat.*log')
384 log_files_re.add(fname + '_lat.*log')
385 log_files_re.add(fname + '_slat.*log')
koder aka kdanilovfd2cfa52015-05-20 03:17:42 +0300386
387 if 'write_iops_log' in cfg.vals:
388 fname = cfg.vals['write_iops_log']
koder aka kdanilove573c9c2015-06-02 08:57:48 +0300389 log_files_re.add(fname + '_iops.*log')
390
391 if 'write_bw_log' in cfg.vals:
392 fname = cfg.vals['write_bw_log']
393 log_files_re.add(fname + '_bw.*log')
394
395 log_files = set()
396 for fname in new_files:
397 for rexpr in log_files_re:
398 if re.match(rexpr + "$", fname):
399 log_files.add(fname)
koder aka kdanilovfd2cfa52015-05-20 03:17:42 +0300400
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300401 with self.node.connection.open_sftp() as sftp:
koder aka kdanilovfd2cfa52015-05-20 03:17:42 +0300402 result = read_from_remote(sftp, self.results_file)
koder aka kdanilovbb5fe072015-05-21 02:50:23 +0300403 exit_code = read_from_remote(sftp, self.exit_code_file)
404 err_out = read_from_remote(sftp, self.err_out_file)
405 exit_code = exit_code.strip()
406
407 if exit_code != '0':
408 msg = "fio exit with code {0}: {1}".format(exit_code, err_out)
409 logger.critical(msg.strip())
410 raise StopTestError("fio failed")
411
koder aka kdanilovfd2cfa52015-05-20 03:17:42 +0300412 sftp.remove(self.results_file)
koder aka kdanilovbb5fe072015-05-21 02:50:23 +0300413 sftp.remove(self.err_out_file)
414 sftp.remove(self.exit_code_file)
koder aka kdanilovfd2cfa52015-05-20 03:17:42 +0300415
416 fname = "{0}_{1}.json".format(pos, fconn_id)
417 with open(os.path.join(self.log_directory, fname), "w") as fd:
418 fd.write(result)
419
koder aka kdanilov88407ff2015-05-26 15:35:57 +0300420 files = {}
koder aka kdanilovfd2cfa52015-05-20 03:17:42 +0300421 for fname in log_files:
koder aka kdanilove573c9c2015-06-02 08:57:48 +0300422 rpath = os.path.join(exec_folder, fname)
423
koder aka kdanilovfd2cfa52015-05-20 03:17:42 +0300424 try:
koder aka kdanilove573c9c2015-06-02 08:57:48 +0300425 fc = read_from_remote(sftp, rpath)
426 except Exception as exc:
427 msg = "Can't read file {0} from remote: {1}".format(rpath, exc)
428 logger.error(msg)
koder aka kdanilovfd2cfa52015-05-20 03:17:42 +0300429 continue
koder aka kdanilove573c9c2015-06-02 08:57:48 +0300430
431 sftp.remove(os.path.join(exec_folder, fname))
koder aka kdanilov88407ff2015-05-26 15:35:57 +0300432 ftype = fname.split('_')[-1].split(".")[0]
433 loc_fname = "{0}_{1}_{2}.log".format(pos, fconn_id, ftype)
434 files.setdefault(ftype, []).append(loc_fname)
koder aka kdanilove573c9c2015-06-02 08:57:48 +0300435
koder aka kdanilovfd2cfa52015-05-20 03:17:42 +0300436 loc_path = os.path.join(self.log_directory, loc_fname)
koder aka kdanilove573c9c2015-06-02 08:57:48 +0300437
koder aka kdanilovfd2cfa52015-05-20 03:17:42 +0300438 with open(loc_path, "w") as fd:
439 fd.write(fc)
440
koder aka kdanilov88407ff2015-05-26 15:35:57 +0300441 return result, (begin, end), files
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300442
443 @classmethod
444 def merge_results(cls, results):
445 merged = results[0]
446 for block in results[1:]:
447 assert block["__meta__"] == merged["__meta__"]
448 merged['res'].extend(block['res'])
449 return merged
450
451 @classmethod
452 def format_for_console(cls, data, dinfo):
453 return format_results_for_console(dinfo)