blob: 978fa468f09c27da4d94a5699928e888dbe7bb49 [file] [log] [blame]
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +03001import time
2import json
3import os.path
4import logging
5import datetime
6
koder aka kdanilovbb5fe072015-05-21 02:50:23 +03007import paramiko
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +03008
koder aka kdanilovbb5fe072015-05-21 02:50:23 +03009from wally.utils import (ssize2b, sec_to_str, StopTestError)
10
11from wally.ssh_utils import (save_to_remote, read_from_remote, BGSSHTask,
12 reconnect)
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030013
14from ..itest import IPerfTest, TestResults
15from .formatter import format_results_for_console
16from .fio_task_parser import (execution_time, fio_cfg_compile,
17 get_test_summary, FioJobSection)
18
19
20logger = logging.getLogger("wally")
21
22
23class IOTestResults(TestResults):
24 def summary(self):
25 return get_test_summary(self.config) + "vm" + str(self.vm_count)
26
27 def get_yamable(self):
28 return {
29 'type': "fio_test",
30 'params': self.params,
31 'config': (self.config.name, self.config.vals),
32 'results': self.results,
33 'raw_result': self.raw_result,
34 'run_interval': self.run_interval,
koder aka kdanilovbb5fe072015-05-21 02:50:23 +030035 'vm_count': self.vm_count,
36 'test_name': self.test_name
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030037 }
38
39 @classmethod
40 def from_yaml(cls, data):
41 name, vals = data['config']
42 sec = FioJobSection(name)
43 sec.vals = vals
44
45 return cls(sec, data['params'], data['results'],
46 data['raw_result'], data['run_interval'],
koder aka kdanilovbb5fe072015-05-21 02:50:23 +030047 data['vm_count'], data['test_name'])
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030048
49
50def get_slice_parts_offset(test_slice, real_inteval):
51 calc_exec_time = sum(map(execution_time, test_slice))
52 coef = (real_inteval[1] - real_inteval[0]) / calc_exec_time
53 curr_offset = real_inteval[0]
54 for section in test_slice:
55 slen = execution_time(section) * coef
56 yield (curr_offset, curr_offset + slen)
57 curr_offset += slen
58
59
60class IOPerfTest(IPerfTest):
61 tcp_conn_timeout = 30
62 max_pig_timeout = 5
63 soft_runcycle = 5 * 60
64
65 def __init__(self, *dt, **mp):
66 IPerfTest.__init__(self, *dt, **mp)
67 self.config_fname = self.options['cfg']
68
69 if '/' not in self.config_fname and '.' not in self.config_fname:
70 cfgs_dir = os.path.dirname(__file__)
71 self.config_fname = os.path.join(cfgs_dir,
72 self.config_fname + '.cfg')
73
74 self.alive_check_interval = self.options.get('alive_check_interval')
75
76 self.config_params = self.options.get('params', {}).copy()
77 self.tool = self.options.get('tool', 'fio')
78
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030079 self.io_py_remote = self.join_remote("agent.py")
80 self.results_file = self.join_remote("results.json")
81 self.pid_file = self.join_remote("pid")
82 self.task_file = self.join_remote("task.cfg")
koder aka kdanilovbb5fe072015-05-21 02:50:23 +030083 self.sh_file = self.join_remote("cmd.sh")
84 self.err_out_file = self.join_remote("fio_err_out")
85 self.exit_code_file = self.join_remote("exit_code")
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030086 self.use_sudo = self.options.get("use_sudo", True)
87 self.test_logging = self.options.get("test_logging", False)
88 self.raw_cfg = open(self.config_fname).read()
89 self.fio_configs = fio_cfg_compile(self.raw_cfg,
90 self.config_fname,
91 self.config_params,
92 split_on_names=self.test_logging)
93 self.fio_configs = list(self.fio_configs)
94
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030095 def __str__(self):
96 return "{0}({1})".format(self.__class__.__name__,
97 self.node.get_conn_id())
98
99 @classmethod
100 def load(cls, data):
101 return IOTestResults.from_yaml(data)
102
103 def cleanup(self):
104 # delete_file(conn, self.io_py_remote)
105 # Need to remove tempo files, used for testing
106 pass
107
108 def prefill_test_files(self):
109 files = {}
110 for cfg_slice in self.fio_configs:
111 for section in cfg_slice:
112 sz = ssize2b(section.vals['size'])
113 msz = sz / (1024 ** 2)
114
115 if sz % (1024 ** 2) != 0:
116 msz += 1
117
118 fname = section.vals['filename']
119
120 # if already has other test with the same file name
121 # take largest size
122 files[fname] = max(files.get(fname, 0), msz)
123
124 cmd_templ = "dd oflag=direct " + \
125 "if=/dev/zero of={0} bs={1} count={2}"
126
127 if self.use_sudo:
128 cmd_templ = "sudo " + cmd_templ
129
130 ssize = 0
131 stime = time.time()
132
133 for fname, curr_sz in files.items():
134 cmd = cmd_templ.format(fname, 1024 ** 2, curr_sz)
135 ssize += curr_sz
136 self.run_over_ssh(cmd, timeout=curr_sz)
137
138 ddtime = time.time() - stime
139 if ddtime > 1E-3:
140 fill_bw = int(ssize / ddtime)
141 mess = "Initiall dd fill bw is {0} MiBps for this vm"
142 logger.info(mess.format(fill_bw))
143 self.coordinate(('init_bw', fill_bw))
144
145 def install_utils(self, max_retry=3, timeout=5):
146 need_install = []
147 for bin_name, package in (('fio', 'fio'), ('screen', 'screen')):
148 try:
149 self.run_over_ssh('which ' + bin_name, nolog=True)
150 except OSError:
151 need_install.append(package)
152
153 if len(need_install) == 0:
154 return
155
156 cmd = "sudo apt-get -y install " + " ".join(need_install)
157
158 for i in range(max_retry):
159 try:
160 self.run_over_ssh(cmd)
161 break
162 except OSError as err:
163 time.sleep(timeout)
164 else:
165 raise OSError("Can't install - " + str(err))
166
167 def pre_run(self):
168 try:
169 cmd = 'mkdir -p "{0}"'.format(self.remote_dir)
170 if self.use_sudo:
171 cmd = "sudo " + cmd
172 cmd += " ; sudo chown {0} {1}".format(self.node.get_user(),
173 self.remote_dir)
174
175 self.run_over_ssh(cmd)
176 except Exception as exc:
177 msg = "Failed to create folder {0} on remote {1}. Error: {2!s}"
178 msg = msg.format(self.remote_dir, self.node.get_conn_id(), exc)
179 logger.exception(msg)
180 raise StopTestError(msg, exc)
181
182 self.install_utils()
183
184 if self.options.get('prefill_files', True):
185 self.prefill_test_files()
186 elif self.is_primary:
187 logger.warning("Prefilling of test files is disabled")
188
189 def run(self, barrier):
190 try:
191 if len(self.fio_configs) > 1 and self.is_primary:
192
193 exec_time = 0
194 for test_slice in self.fio_configs:
195 exec_time += sum(map(execution_time, test_slice))
196
197 # +10% - is a rough estimation for additional operations
198 # like sftp, etc
199 exec_time = int(exec_time * 1.1)
200
201 exec_time_s = sec_to_str(exec_time)
202 now_dt = datetime.datetime.now()
203 end_dt = now_dt + datetime.timedelta(0, exec_time)
204 msg = "Entire test should takes aroud: {0} and finished at {1}"
205 logger.info(msg.format(exec_time_s,
206 end_dt.strftime("%H:%M:%S")))
207
208 for pos, fio_cfg_slice in enumerate(self.fio_configs):
209 fio_cfg_slice = list(fio_cfg_slice)
210 names = [i.name for i in fio_cfg_slice]
211 msgs = []
212 already_processed = set()
213 for name in names:
214 if name not in already_processed:
215 already_processed.add(name)
216
217 if 1 == names.count(name):
218 msgs.append(name)
219 else:
220 frmt = "{0} * {1}"
221 msgs.append(frmt.format(name,
222 names.count(name)))
223
224 if self.is_primary:
225 logger.info("Will run tests: " + ", ".join(msgs))
226
227 nolog = (pos != 0) or not self.is_primary
koder aka kdanilovfd2cfa52015-05-20 03:17:42 +0300228 out_err, interval = self.do_run(barrier, fio_cfg_slice, pos,
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300229 nolog=nolog)
230
231 try:
232 full_raw_res = json.loads(out_err)
233
234 res = {"bw": [], "iops": [], "lat": [],
235 "clat": [], "slat": []}
236
237 for raw_result in full_raw_res['jobs']:
238 load_data = raw_result['mixed']
239
240 res["bw"].append(load_data["bw"])
241 res["iops"].append(load_data["iops"])
242 res["lat"].append(load_data["lat"]["mean"])
243 res["clat"].append(load_data["clat"]["mean"])
244 res["slat"].append(load_data["slat"]["mean"])
245
246 first = fio_cfg_slice[0]
247 p1 = first.vals.copy()
248 p1.pop('ramp_time', 0)
249
250 for nxt in fio_cfg_slice[1:]:
251 assert nxt.name == first.name
252 p2 = nxt.vals
253 p2.pop('_ramp_time', 0)
254
255 assert p1 == p2
256
257 tres = IOTestResults(first,
258 self.config_params, res,
259 full_raw_res, interval,
260 vm_count=self.total_nodes_count)
koder aka kdanilovbb5fe072015-05-21 02:50:23 +0300261 tres.test_name = os.path.basename(self.config_fname)
262 if tres.test_name.endswith('.cfg'):
263 tres.test_name = tres.test_name[:-4]
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300264 self.on_result_cb(tres)
265 except (OSError, StopTestError):
266 raise
267 except Exception as exc:
268 msg_templ = "Error during postprocessing results: {0!s}"
269 raise RuntimeError(msg_templ.format(exc))
270
271 finally:
272 barrier.exit()
273
koder aka kdanilovfd2cfa52015-05-20 03:17:42 +0300274 def do_run(self, barrier, cfg_slice, pos, nolog=False):
koder aka kdanilovbb5fe072015-05-21 02:50:23 +0300275 bash_file = "#!/bin/bash\n" + \
276 "fio --output-format=json --output={out_file} " + \
277 "--alloc-size=262144 {job_file} " + \
278 " >{err_out_file} 2>&1 \n" + \
279 "echo $? >{res_code_file}\n"
280
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300281 conn_id = self.node.get_conn_id()
koder aka kdanilovfd2cfa52015-05-20 03:17:42 +0300282 fconn_id = conn_id.replace(":", "_")
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300283
koder aka kdanilovbb5fe072015-05-21 02:50:23 +0300284 # cmd_templ = "fio --output-format=json --output={1} " + \
285 # "--alloc-size=262144 {0}"
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300286
koder aka kdanilovbb5fe072015-05-21 02:50:23 +0300287 bash_file = bash_file.format(out_file=self.results_file,
288 job_file=self.task_file,
289 err_out_file=self.err_out_file,
290 res_code_file=self.exit_code_file)
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300291
292 task_fc = "\n\n".join(map(str, cfg_slice))
293 with self.node.connection.open_sftp() as sftp:
294 save_to_remote(sftp, self.task_file, task_fc)
koder aka kdanilovbb5fe072015-05-21 02:50:23 +0300295 save_to_remote(sftp, self.sh_file, bash_file)
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300296
koder aka kdanilovfd2cfa52015-05-20 03:17:42 +0300297 fname = "{0}_{1}.fio".format(pos, fconn_id)
298 with open(os.path.join(self.log_directory, fname), "w") as fd:
299 fd.write(task_fc)
300
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300301 exec_time = sum(map(execution_time, cfg_slice))
302 exec_time_str = sec_to_str(exec_time)
303
304 timeout = int(exec_time + max(300, exec_time))
305 soft_tout = exec_time
koder aka kdanilovbb5fe072015-05-21 02:50:23 +0300306
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300307 barrier.wait()
308
309 if self.is_primary:
310 templ = "Test should takes about {0}." + \
311 " Should finish at {1}," + \
312 " will wait at most till {2}"
313 now_dt = datetime.datetime.now()
314 end_dt = now_dt + datetime.timedelta(0, exec_time)
315 wait_till = now_dt + datetime.timedelta(0, timeout)
316
317 logger.info(templ.format(exec_time_str,
318 end_dt.strftime("%H:%M:%S"),
319 wait_till.strftime("%H:%M:%S")))
320
koder aka kdanilovfd2cfa52015-05-20 03:17:42 +0300321 self.run_over_ssh("cd " + os.path.dirname(self.task_file), nolog=True)
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300322 task = BGSSHTask(self.node, self.options.get("use_sudo", True))
323 begin = time.time()
koder aka kdanilovbb5fe072015-05-21 02:50:23 +0300324
325 if self.options.get("use_sudo", True):
326 sudo = "sudo "
327 else:
328 sudo = ""
329
330 task.start(sudo + "bash " + self.sh_file)
331
332 while True:
333 try:
334 task.wait(soft_tout, timeout)
335 break
336 except paramiko.SSHException:
337 pass
338
339 try:
340 self.node.connection.close()
341 except:
342 pass
343
344 reconnect(self.node.connection, self.node.conn_url)
345
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300346 end = time.time()
347
348 if not nolog:
349 logger.debug("Test on node {0} is finished".format(conn_id))
350
koder aka kdanilovfd2cfa52015-05-20 03:17:42 +0300351 log_files = set()
352 for cfg in cfg_slice:
353 if 'write_lat_log' in cfg.vals:
354 fname = cfg.vals['write_lat_log']
355 log_files.add(fname + '_clat.log')
356 log_files.add(fname + '_lat.log')
357 log_files.add(fname + '_slat.log')
358
359 if 'write_iops_log' in cfg.vals:
360 fname = cfg.vals['write_iops_log']
361 log_files.add(fname + '_iops.log')
362
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300363 with self.node.connection.open_sftp() as sftp:
koder aka kdanilovfd2cfa52015-05-20 03:17:42 +0300364 result = read_from_remote(sftp, self.results_file)
koder aka kdanilovbb5fe072015-05-21 02:50:23 +0300365 exit_code = read_from_remote(sftp, self.exit_code_file)
366 err_out = read_from_remote(sftp, self.err_out_file)
367 exit_code = exit_code.strip()
368
369 if exit_code != '0':
370 msg = "fio exit with code {0}: {1}".format(exit_code, err_out)
371 logger.critical(msg.strip())
372 raise StopTestError("fio failed")
373
koder aka kdanilovfd2cfa52015-05-20 03:17:42 +0300374 sftp.remove(self.results_file)
koder aka kdanilovbb5fe072015-05-21 02:50:23 +0300375 sftp.remove(self.err_out_file)
376 sftp.remove(self.exit_code_file)
koder aka kdanilovfd2cfa52015-05-20 03:17:42 +0300377
378 fname = "{0}_{1}.json".format(pos, fconn_id)
379 with open(os.path.join(self.log_directory, fname), "w") as fd:
380 fd.write(result)
381
382 for fname in log_files:
383 try:
384 fc = read_from_remote(sftp, fname)
385 except:
386 continue
387 sftp.remove(fname)
388
389 loc_fname = "{0}_{1}_{2}".format(pos, fconn_id,
390 fname.split('_')[-1])
391 loc_path = os.path.join(self.log_directory, loc_fname)
392 with open(loc_path, "w") as fd:
393 fd.write(fc)
394
395 return result, (begin, end)
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300396
397 @classmethod
398 def merge_results(cls, results):
399 merged = results[0]
400 for block in results[1:]:
401 assert block["__meta__"] == merged["__meta__"]
402 merged['res'].extend(block['res'])
403 return merged
404
405 @classmethod
406 def format_for_console(cls, data, dinfo):
407 return format_results_for_console(dinfo)