blob: e548395ef02bfc8efc3130eb2729814552c4e2f1 [file] [log] [blame]
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +03001import time
2import json
3import os.path
4import logging
5import datetime
6
koder aka kdanilovbb5fe072015-05-21 02:50:23 +03007import paramiko
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +03008
koder aka kdanilovbb5fe072015-05-21 02:50:23 +03009from wally.utils import (ssize2b, sec_to_str, StopTestError)
10
11from wally.ssh_utils import (save_to_remote, read_from_remote, BGSSHTask,
12 reconnect)
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030013
14from ..itest import IPerfTest, TestResults
15from .formatter import format_results_for_console
16from .fio_task_parser import (execution_time, fio_cfg_compile,
17 get_test_summary, FioJobSection)
18
19
20logger = logging.getLogger("wally")
21
22
23class IOTestResults(TestResults):
24 def summary(self):
25 return get_test_summary(self.config) + "vm" + str(self.vm_count)
26
27 def get_yamable(self):
28 return {
29 'type': "fio_test",
30 'params': self.params,
31 'config': (self.config.name, self.config.vals),
32 'results': self.results,
33 'raw_result': self.raw_result,
34 'run_interval': self.run_interval,
koder aka kdanilovbb5fe072015-05-21 02:50:23 +030035 'vm_count': self.vm_count,
koder aka kdanilov88407ff2015-05-26 15:35:57 +030036 'test_name': self.test_name,
37 'files': self.files
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030038 }
39
40 @classmethod
41 def from_yaml(cls, data):
42 name, vals = data['config']
43 sec = FioJobSection(name)
44 sec.vals = vals
45
46 return cls(sec, data['params'], data['results'],
47 data['raw_result'], data['run_interval'],
koder aka kdanilov88407ff2015-05-26 15:35:57 +030048 data['vm_count'], data['test_name'],
49 files=data.get('files', {}))
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030050
51
52def get_slice_parts_offset(test_slice, real_inteval):
53 calc_exec_time = sum(map(execution_time, test_slice))
54 coef = (real_inteval[1] - real_inteval[0]) / calc_exec_time
55 curr_offset = real_inteval[0]
56 for section in test_slice:
57 slen = execution_time(section) * coef
58 yield (curr_offset, curr_offset + slen)
59 curr_offset += slen
60
61
62class IOPerfTest(IPerfTest):
63 tcp_conn_timeout = 30
64 max_pig_timeout = 5
65 soft_runcycle = 5 * 60
66
67 def __init__(self, *dt, **mp):
68 IPerfTest.__init__(self, *dt, **mp)
69 self.config_fname = self.options['cfg']
70
71 if '/' not in self.config_fname and '.' not in self.config_fname:
72 cfgs_dir = os.path.dirname(__file__)
73 self.config_fname = os.path.join(cfgs_dir,
74 self.config_fname + '.cfg')
75
76 self.alive_check_interval = self.options.get('alive_check_interval')
77
78 self.config_params = self.options.get('params', {}).copy()
79 self.tool = self.options.get('tool', 'fio')
80
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030081 self.io_py_remote = self.join_remote("agent.py")
82 self.results_file = self.join_remote("results.json")
83 self.pid_file = self.join_remote("pid")
84 self.task_file = self.join_remote("task.cfg")
koder aka kdanilovbb5fe072015-05-21 02:50:23 +030085 self.sh_file = self.join_remote("cmd.sh")
86 self.err_out_file = self.join_remote("fio_err_out")
87 self.exit_code_file = self.join_remote("exit_code")
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030088 self.use_sudo = self.options.get("use_sudo", True)
89 self.test_logging = self.options.get("test_logging", False)
90 self.raw_cfg = open(self.config_fname).read()
91 self.fio_configs = fio_cfg_compile(self.raw_cfg,
92 self.config_fname,
93 self.config_params,
94 split_on_names=self.test_logging)
95 self.fio_configs = list(self.fio_configs)
96
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030097 def __str__(self):
98 return "{0}({1})".format(self.__class__.__name__,
99 self.node.get_conn_id())
100
101 @classmethod
102 def load(cls, data):
103 return IOTestResults.from_yaml(data)
104
105 def cleanup(self):
106 # delete_file(conn, self.io_py_remote)
107 # Need to remove tempo files, used for testing
108 pass
109
110 def prefill_test_files(self):
111 files = {}
112 for cfg_slice in self.fio_configs:
113 for section in cfg_slice:
114 sz = ssize2b(section.vals['size'])
115 msz = sz / (1024 ** 2)
116
117 if sz % (1024 ** 2) != 0:
118 msz += 1
119
120 fname = section.vals['filename']
121
122 # if already has other test with the same file name
123 # take largest size
124 files[fname] = max(files.get(fname, 0), msz)
125
koder aka kdanilov88407ff2015-05-26 15:35:57 +0300126 cmd_templ = "fio --name=xxx --filename={0} --direct=1" + \
127 " --bs=4m --size={1}m --rw=write"
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300128
129 if self.use_sudo:
130 cmd_templ = "sudo " + cmd_templ
131
132 ssize = 0
133 stime = time.time()
134
135 for fname, curr_sz in files.items():
koder aka kdanilov88407ff2015-05-26 15:35:57 +0300136 cmd = cmd_templ.format(fname, curr_sz)
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300137 ssize += curr_sz
138 self.run_over_ssh(cmd, timeout=curr_sz)
139
koder aka kdanilov88407ff2015-05-26 15:35:57 +0300140 # if self.use_sudo:
141 # self.run_over_ssh("sudo echo 3 > /proc/sys/vm/drop_caches",
142 # timeout=5)
143 # else:
144 # logging.warning("Can't flush caches as sudo us disabled")
145
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300146 ddtime = time.time() - stime
147 if ddtime > 1E-3:
148 fill_bw = int(ssize / ddtime)
149 mess = "Initiall dd fill bw is {0} MiBps for this vm"
150 logger.info(mess.format(fill_bw))
151 self.coordinate(('init_bw', fill_bw))
152
153 def install_utils(self, max_retry=3, timeout=5):
154 need_install = []
155 for bin_name, package in (('fio', 'fio'), ('screen', 'screen')):
156 try:
157 self.run_over_ssh('which ' + bin_name, nolog=True)
158 except OSError:
159 need_install.append(package)
160
161 if len(need_install) == 0:
162 return
163
164 cmd = "sudo apt-get -y install " + " ".join(need_install)
165
166 for i in range(max_retry):
167 try:
168 self.run_over_ssh(cmd)
169 break
170 except OSError as err:
171 time.sleep(timeout)
172 else:
173 raise OSError("Can't install - " + str(err))
174
175 def pre_run(self):
176 try:
177 cmd = 'mkdir -p "{0}"'.format(self.remote_dir)
178 if self.use_sudo:
179 cmd = "sudo " + cmd
180 cmd += " ; sudo chown {0} {1}".format(self.node.get_user(),
181 self.remote_dir)
182
183 self.run_over_ssh(cmd)
184 except Exception as exc:
185 msg = "Failed to create folder {0} on remote {1}. Error: {2!s}"
186 msg = msg.format(self.remote_dir, self.node.get_conn_id(), exc)
187 logger.exception(msg)
188 raise StopTestError(msg, exc)
189
190 self.install_utils()
191
192 if self.options.get('prefill_files', True):
193 self.prefill_test_files()
194 elif self.is_primary:
195 logger.warning("Prefilling of test files is disabled")
196
197 def run(self, barrier):
198 try:
199 if len(self.fio_configs) > 1 and self.is_primary:
200
201 exec_time = 0
202 for test_slice in self.fio_configs:
203 exec_time += sum(map(execution_time, test_slice))
204
205 # +10% - is a rough estimation for additional operations
206 # like sftp, etc
207 exec_time = int(exec_time * 1.1)
208
209 exec_time_s = sec_to_str(exec_time)
210 now_dt = datetime.datetime.now()
211 end_dt = now_dt + datetime.timedelta(0, exec_time)
212 msg = "Entire test should takes aroud: {0} and finished at {1}"
213 logger.info(msg.format(exec_time_s,
214 end_dt.strftime("%H:%M:%S")))
215
216 for pos, fio_cfg_slice in enumerate(self.fio_configs):
217 fio_cfg_slice = list(fio_cfg_slice)
218 names = [i.name for i in fio_cfg_slice]
219 msgs = []
220 already_processed = set()
221 for name in names:
222 if name not in already_processed:
223 already_processed.add(name)
224
225 if 1 == names.count(name):
226 msgs.append(name)
227 else:
228 frmt = "{0} * {1}"
229 msgs.append(frmt.format(name,
230 names.count(name)))
231
232 if self.is_primary:
233 logger.info("Will run tests: " + ", ".join(msgs))
234
235 nolog = (pos != 0) or not self.is_primary
koder aka kdanilov88407ff2015-05-26 15:35:57 +0300236
237 max_retr = 3 if self.total_nodes_count == 1 else 1
238
239 for idx in range(max_retr):
240 try:
241 out_err, interval, files = self.do_run(barrier, fio_cfg_slice, pos,
242 nolog=nolog)
243 break
244 except Exception as exc:
245 logger.exception("During fio run")
246 if idx == max_retr - 1:
247 raise StopTestError("Fio failed", exc)
248 logger.info("Sleeping 30s and retrying")
249 time.sleep(30)
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300250
251 try:
koder aka kdanilov88407ff2015-05-26 15:35:57 +0300252 # HACK
253 out_err = "{" + out_err.split("{", 1)[1]
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300254 full_raw_res = json.loads(out_err)
255
256 res = {"bw": [], "iops": [], "lat": [],
257 "clat": [], "slat": []}
258
259 for raw_result in full_raw_res['jobs']:
260 load_data = raw_result['mixed']
261
262 res["bw"].append(load_data["bw"])
263 res["iops"].append(load_data["iops"])
264 res["lat"].append(load_data["lat"]["mean"])
265 res["clat"].append(load_data["clat"]["mean"])
266 res["slat"].append(load_data["slat"]["mean"])
267
268 first = fio_cfg_slice[0]
269 p1 = first.vals.copy()
270 p1.pop('ramp_time', 0)
koder aka kdanilov88407ff2015-05-26 15:35:57 +0300271 p1.pop('offset', 0)
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300272
273 for nxt in fio_cfg_slice[1:]:
274 assert nxt.name == first.name
275 p2 = nxt.vals
276 p2.pop('_ramp_time', 0)
koder aka kdanilov88407ff2015-05-26 15:35:57 +0300277 p2.pop('offset', 0)
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300278 assert p1 == p2
279
koder aka kdanilov88407ff2015-05-26 15:35:57 +0300280 tname = os.path.basename(self.config_fname)
281 if tname.endswith('.cfg'):
282 tname = tname[:-4]
283
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300284 tres = IOTestResults(first,
285 self.config_params, res,
286 full_raw_res, interval,
koder aka kdanilov88407ff2015-05-26 15:35:57 +0300287 test_name=tname,
288 vm_count=self.total_nodes_count,
289 files=files)
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300290 self.on_result_cb(tres)
koder aka kdanilov88407ff2015-05-26 15:35:57 +0300291 except StopTestError:
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300292 raise
293 except Exception as exc:
koder aka kdanilov88407ff2015-05-26 15:35:57 +0300294 msg_templ = "Error during postprocessing results"
295 logger.exception(msg_templ)
296 raise StopTestError(msg_templ.format(exc), exc)
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300297
298 finally:
299 barrier.exit()
300
koder aka kdanilovfd2cfa52015-05-20 03:17:42 +0300301 def do_run(self, barrier, cfg_slice, pos, nolog=False):
koder aka kdanilovbb5fe072015-05-21 02:50:23 +0300302 bash_file = "#!/bin/bash\n" + \
303 "fio --output-format=json --output={out_file} " + \
304 "--alloc-size=262144 {job_file} " + \
305 " >{err_out_file} 2>&1 \n" + \
306 "echo $? >{res_code_file}\n"
307
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300308 conn_id = self.node.get_conn_id()
koder aka kdanilovfd2cfa52015-05-20 03:17:42 +0300309 fconn_id = conn_id.replace(":", "_")
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300310
koder aka kdanilovbb5fe072015-05-21 02:50:23 +0300311 # cmd_templ = "fio --output-format=json --output={1} " + \
312 # "--alloc-size=262144 {0}"
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300313
koder aka kdanilovbb5fe072015-05-21 02:50:23 +0300314 bash_file = bash_file.format(out_file=self.results_file,
315 job_file=self.task_file,
316 err_out_file=self.err_out_file,
317 res_code_file=self.exit_code_file)
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300318
319 task_fc = "\n\n".join(map(str, cfg_slice))
320 with self.node.connection.open_sftp() as sftp:
321 save_to_remote(sftp, self.task_file, task_fc)
koder aka kdanilovbb5fe072015-05-21 02:50:23 +0300322 save_to_remote(sftp, self.sh_file, bash_file)
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300323
koder aka kdanilovfd2cfa52015-05-20 03:17:42 +0300324 fname = "{0}_{1}.fio".format(pos, fconn_id)
325 with open(os.path.join(self.log_directory, fname), "w") as fd:
326 fd.write(task_fc)
327
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300328 exec_time = sum(map(execution_time, cfg_slice))
329 exec_time_str = sec_to_str(exec_time)
330
331 timeout = int(exec_time + max(300, exec_time))
332 soft_tout = exec_time
koder aka kdanilovbb5fe072015-05-21 02:50:23 +0300333
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300334 barrier.wait()
335
336 if self.is_primary:
337 templ = "Test should takes about {0}." + \
338 " Should finish at {1}," + \
339 " will wait at most till {2}"
340 now_dt = datetime.datetime.now()
341 end_dt = now_dt + datetime.timedelta(0, exec_time)
342 wait_till = now_dt + datetime.timedelta(0, timeout)
343
344 logger.info(templ.format(exec_time_str,
345 end_dt.strftime("%H:%M:%S"),
346 wait_till.strftime("%H:%M:%S")))
347
koder aka kdanilovfd2cfa52015-05-20 03:17:42 +0300348 self.run_over_ssh("cd " + os.path.dirname(self.task_file), nolog=True)
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300349 task = BGSSHTask(self.node, self.options.get("use_sudo", True))
350 begin = time.time()
koder aka kdanilovbb5fe072015-05-21 02:50:23 +0300351
352 if self.options.get("use_sudo", True):
353 sudo = "sudo "
354 else:
355 sudo = ""
356
357 task.start(sudo + "bash " + self.sh_file)
358
359 while True:
360 try:
361 task.wait(soft_tout, timeout)
362 break
363 except paramiko.SSHException:
364 pass
365
366 try:
367 self.node.connection.close()
368 except:
369 pass
370
371 reconnect(self.node.connection, self.node.conn_url)
372
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300373 end = time.time()
374
375 if not nolog:
376 logger.debug("Test on node {0} is finished".format(conn_id))
377
koder aka kdanilovfd2cfa52015-05-20 03:17:42 +0300378 log_files = set()
379 for cfg in cfg_slice:
380 if 'write_lat_log' in cfg.vals:
381 fname = cfg.vals['write_lat_log']
382 log_files.add(fname + '_clat.log')
383 log_files.add(fname + '_lat.log')
384 log_files.add(fname + '_slat.log')
385
386 if 'write_iops_log' in cfg.vals:
387 fname = cfg.vals['write_iops_log']
388 log_files.add(fname + '_iops.log')
389
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300390 with self.node.connection.open_sftp() as sftp:
koder aka kdanilovfd2cfa52015-05-20 03:17:42 +0300391 result = read_from_remote(sftp, self.results_file)
koder aka kdanilovbb5fe072015-05-21 02:50:23 +0300392 exit_code = read_from_remote(sftp, self.exit_code_file)
393 err_out = read_from_remote(sftp, self.err_out_file)
394 exit_code = exit_code.strip()
395
396 if exit_code != '0':
397 msg = "fio exit with code {0}: {1}".format(exit_code, err_out)
398 logger.critical(msg.strip())
399 raise StopTestError("fio failed")
400
koder aka kdanilovfd2cfa52015-05-20 03:17:42 +0300401 sftp.remove(self.results_file)
koder aka kdanilovbb5fe072015-05-21 02:50:23 +0300402 sftp.remove(self.err_out_file)
403 sftp.remove(self.exit_code_file)
koder aka kdanilovfd2cfa52015-05-20 03:17:42 +0300404
405 fname = "{0}_{1}.json".format(pos, fconn_id)
406 with open(os.path.join(self.log_directory, fname), "w") as fd:
407 fd.write(result)
408
koder aka kdanilov88407ff2015-05-26 15:35:57 +0300409 files = {}
410
koder aka kdanilovfd2cfa52015-05-20 03:17:42 +0300411 for fname in log_files:
412 try:
413 fc = read_from_remote(sftp, fname)
414 except:
415 continue
416 sftp.remove(fname)
koder aka kdanilov88407ff2015-05-26 15:35:57 +0300417 ftype = fname.split('_')[-1].split(".")[0]
418 loc_fname = "{0}_{1}_{2}.log".format(pos, fconn_id, ftype)
419 files.setdefault(ftype, []).append(loc_fname)
koder aka kdanilovfd2cfa52015-05-20 03:17:42 +0300420 loc_path = os.path.join(self.log_directory, loc_fname)
421 with open(loc_path, "w") as fd:
422 fd.write(fc)
423
koder aka kdanilov88407ff2015-05-26 15:35:57 +0300424 return result, (begin, end), files
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300425
426 @classmethod
427 def merge_results(cls, results):
428 merged = results[0]
429 for block in results[1:]:
430 assert block["__meta__"] == merged["__meta__"]
431 merged['res'].extend(block['res'])
432 return merged
433
434 @classmethod
435 def format_for_console(cls, data, dinfo):
436 return format_results_for_console(dinfo)