blob: c65006999028bdc289903848fd5792375cf3a503 [file] [log] [blame]
koder aka kdanilovbc2c8982015-06-13 02:50:43 +03001import re
2import time
3import json
4import os.path
5import logging
6import datetime
7import functools
8import subprocess
9import collections
10
11import yaml
12import paramiko
13import texttable
14from paramiko.ssh_exception import SSHException
15from concurrent.futures import ThreadPoolExecutor
16
17from wally.pretty_yaml import dumps
18from wally.statistic import round_3_digit, data_property
19from wally.utils import ssize2b, sec_to_str, StopTestError, Barrier, get_os
20from wally.ssh_utils import (save_to_remote, read_from_remote, BGSSHTask, reconnect)
21
22from .fio_task_parser import (execution_time, fio_cfg_compile,
23 get_test_summary, get_test_sync_mode)
24from ..itest import TimeSeriesValue, PerfTest, TestResults, run_on_node
25
26logger = logging.getLogger("wally")
27
28
29# Results folder structure
30# results/
31# {loadtype}_{num}/
32# config.yaml
33# ......
34
35
36class NoData(object):
37 pass
38
39
40def cached_prop(func):
41 @property
42 @functools.wraps(func)
43 def closure(self):
44 val = getattr(self, "_" + func.__name__)
45 if val is NoData:
46 val = func(self)
47 setattr(self, "_" + func.__name__, val)
48 return val
49 return closure
50
51
52def load_fio_log_file(fname):
53 with open(fname) as fd:
54 it = [ln.split(',')[:2] for ln in fd]
55 vals = [(float(off) / 1000, float(val.strip())) for off, val in it]
56 return TimeSeriesValue(vals)
57
58
59def load_test_results(cls, folder, run_num):
60 res = {}
61 params = None
62
63 fn = os.path.join(folder, str(run_num) + '_params.yaml')
64 params = yaml.load(open(fn).read())
65
66 conn_ids = set()
67 for fname in os.listdir(folder):
68 rr = r"{0}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.\d+\.log$".format(run_num)
69 rm = re.match(rr, fname)
70 if rm is None:
71 continue
72
73 conn_id_s = rm.group('conn_id')
74 conn_id = conn_id_s.replace('_', ':')
75 ftype = rm.group('type')
76
77 if ftype not in ('iops', 'bw', 'lat'):
78 continue
79
80 try:
81 ts = load_fio_log_file(os.path.join(folder, fname))
82 if ftype in res:
83 assert conn_id not in res[ftype]
84
85 res.setdefault(ftype, {})[conn_id] = ts
86 except AssertionError:
87 pass
88
89 conn_ids.add(conn_id)
90
91 raw_res = {}
92 for conn_id in conn_ids:
93 fn = os.path.join(folder, "{0}_{1}_rawres.json".format(run_num, conn_id_s))
94
95 # remove message hack
96 fc = "{" + open(fn).read().split('{', 1)[1]
97 raw_res[conn_id] = json.loads(fc)
98
99 return cls(params, res, raw_res)
100
101
102class Attrmapper(object):
103 def __init__(self, dct):
104 self.__dct = dct
105
106 def __getattr__(self, name):
107 try:
108 return self.__dct[name]
109 except KeyError:
110 raise AttributeError(name)
111
112
113class DiskPerfInfo(object):
114 def __init__(self, name, summary, params, testnodes_count):
115 self.name = name
116 self.bw = None
117 self.iops = None
118 self.lat = None
119 self.lat_50 = None
120 self.lat_95 = None
121
122 self.raw_bw = []
123 self.raw_iops = []
124 self.raw_lat = []
125
126 self.params = params
127 self.testnodes_count = testnodes_count
128 self.summary = summary
129 self.p = Attrmapper(self.params['vals'])
130
131 self.sync_mode = get_test_sync_mode(self.params['vals'])
132 self.concurence = self.params['vals'].get('numjobs', 1)
133
134
135def get_lat_perc_50_95(lat_mks):
136 curr_perc = 0
137 perc_50 = None
138 perc_95 = None
139 pkey = None
140 for key, val in sorted(lat_mks.items()):
141 if curr_perc + val >= 50 and perc_50 is None:
142 if pkey is None or val < 1.:
143 perc_50 = key
144 else:
145 perc_50 = (50. - curr_perc) / val * (key - pkey) + pkey
146
147 if curr_perc + val >= 95:
148 if pkey is None or val < 1.:
149 perc_95 = key
150 else:
151 perc_95 = (95. - curr_perc) / val * (key - pkey) + pkey
152 break
153
154 pkey = key
155 curr_perc += val
156
157 return perc_50 / 1000., perc_95 / 1000.
158
159
160def prepare(ramp_time, data, avg_interval):
161 if data is None:
162 return data
163
164 res = {}
165 for key, ts_data in data.items():
166 if ramp_time > 0:
167 ts_data = ts_data.skip(ramp_time)
168
169 res[key] = ts_data.derived(avg_interval)
170 return res
171
172
173class IOTestResult(TestResults):
174 """
175 Fio run results
176 config: TestConfig
177 fio_task: FioJobSection
178 ts_results: {str: TimeSeriesValue}
179 raw_result: ????
180 run_interval:(float, float) - test tun time, used for sensors
181 """
182 def __init__(self, config, fio_task, ts_results, raw_result, run_interval):
183
184 self.name = fio_task.name.split("_")[0]
185 self.fio_task = fio_task
186
187 ramp_time = fio_task.vals.get('ramp_time', 0)
188
189 self.bw = prepare(ramp_time, ts_results.get('bw'), 1.0)
190 self.lat = prepare(ramp_time, ts_results.get('lat'), 1.0)
191 self.iops = prepare(ramp_time, ts_results.get('iops'), 1.0)
192 # self.slat = drop_warmup(res.get('clat', None), self.params)
193 # self.clat = drop_warmup(res.get('slat', None), self.params)
194
195 res = {"bw": self.bw, "lat": self.lat, "iops": self.iops}
196
197 self.sensors_data = None
198 self._pinfo = None
199 TestResults.__init__(self, config, res, raw_result, run_interval)
200
201 def summary(self):
202 return get_test_summary(self.fio_task) + "vm" \
203 + str(len(self.config.nodes))
204
205 def get_yamable(self):
206 return self.summary()
207
208 @property
209 def disk_perf_info(self):
210 if self._pinfo is not None:
211 return self._pinfo
212
213 lat_mks = collections.defaultdict(lambda: 0)
214 num_res = 0
215
216 for _, result in self.raw_result.items():
217 num_res += len(result['jobs'])
218 for job_info in result['jobs']:
219 for k, v in job_info['latency_ms'].items():
220 if isinstance(k, basestring) and k.startswith('>='):
221 lat_mks[int(k[2:]) * 1000] += v
222 else:
223 lat_mks[int(k) * 1000] += v
224
225 for k, v in job_info['latency_us'].items():
226 lat_mks[int(k)] += v
227
228 for k, v in lat_mks.items():
229 lat_mks[k] = float(v) / num_res
230
231 testnodes_count = len(self.fio_raw_res)
232
233 pinfo = DiskPerfInfo(self.name,
234 self.summary(),
235 self.params,
236 testnodes_count)
237
238 pinfo.raw_bw = [res.vals() for res in self.bw.values()]
239 pinfo.raw_iops = [res.vals() for res in self.iops.values()]
240 pinfo.raw_lat = [res.vals() for res in self.lat.values()]
241
242 pinfo.bw = data_property(map(sum, zip(*pinfo.raw_bw)))
243 pinfo.iops = data_property(map(sum, zip(*pinfo.raw_iops)))
244 pinfo.lat = data_property(sum(pinfo.raw_lat, []))
245 pinfo.lat_50, pinfo.lat_95 = get_lat_perc_50_95(lat_mks)
246
247 self._pinfo = pinfo
248
249 return pinfo
250
251
252class IOPerfTest(PerfTest):
253 tcp_conn_timeout = 30
254 max_pig_timeout = 5
255 soft_runcycle = 5 * 60
256
257 def __init__(self, config):
258 PerfTest.__init__(self, config)
259
260 get = self.config.params.get
261 do_get = self.config.params.__getitem__
262
263 self.config_fname = do_get('cfg')
264
265 if '/' not in self.config_fname and '.' not in self.config_fname:
266 cfgs_dir = os.path.dirname(__file__)
267 self.config_fname = os.path.join(cfgs_dir,
268 self.config_fname + '.cfg')
269
270 self.alive_check_interval = get('alive_check_interval')
271 self.use_system_fio = get('use_system_fio', False)
272
273 self.config_params = get('params', {}).copy()
274
275 self.io_py_remote = self.join_remote("agent.py")
276 self.results_file = self.join_remote("results.json")
277 self.pid_file = self.join_remote("pid")
278 self.task_file = self.join_remote("task.cfg")
279 self.sh_file = self.join_remote("cmd.sh")
280 self.err_out_file = self.join_remote("fio_err_out")
281 self.exit_code_file = self.join_remote("exit_code")
282
283 self.use_sudo = get("use_sudo", True)
284 self.test_logging = get("test_logging", False)
285
286 self.raw_cfg = open(self.config_fname).read()
287 self.fio_configs = fio_cfg_compile(self.raw_cfg,
288 self.config_fname,
289 self.config_params,
290 split_on_names=self.test_logging)
291 self.fio_configs = list(self.fio_configs)
292
293 @classmethod
294 def load(cls, folder):
295 for fname in os.listdir(folder):
296 if re.match("\d+_params.yaml$", fname):
297 num = int(fname.split('_')[0])
298 yield load_test_results(IOTestResult, folder, num)
299
300 def cleanup(self):
301 # delete_file(conn, self.io_py_remote)
302 # Need to remove tempo files, used for testing
303 pass
304
305 def prefill_test_files(self, files, rossh):
306 cmd_templ = "fio --name=xxx --filename={0} --direct=1" + \
307 " --bs=4m --size={1}m --rw=write"
308
309 if self.use_sudo:
310 cmd_templ = "sudo " + cmd_templ
311
312 ssize = 0
313 stime = time.time()
314
315 for fname, curr_sz in files.items():
316 cmd = cmd_templ.format(fname, curr_sz)
317 ssize += curr_sz
318
319 rossh(cmd, timeout=curr_sz)
320
321 ddtime = time.time() - stime
322 if ddtime > 1E-3:
323 fill_bw = int(ssize / ddtime)
324 mess = "Initiall dd fill bw is {0} MiBps for this vm"
325 logger.info(mess.format(fill_bw))
326 return fill_bw
327
328 def install_utils(self, rossh, max_retry=3, timeout=5):
329 need_install = []
330 packs = [('screen', 'screen')]
331
332 if self.use_system_fio:
333 packs.append(('fio', 'fio'))
334 else:
335 # define OS and x32/x64
336 # copy appropriate fio
337 # add fio deps
338 pass
339
340 for bin_name, package in packs:
341 if bin_name is None:
342 need_install.append(package)
343 continue
344
345 try:
346 rossh('which ' + bin_name, nolog=True)
347 except OSError:
348 need_install.append(package)
349
350 if len(need_install) == 0:
351 return
352
353 if 'redhat' == get_os(rossh):
354 cmd = "sudo yum -y install " + " ".join(need_install)
355 else:
356 cmd = "sudo apt-get -y install " + " ".join(need_install)
357
358 for _ in range(max_retry):
359 try:
360 rossh(cmd)
361 break
362 except OSError as err:
363 time.sleep(timeout)
364 else:
365 raise OSError("Can't install - " + str(err))
366
367 def pre_run(self):
368 prefill = False
369 prefill = self.config.options.get('prefill_files', True)
370
371 if prefill:
372 files = {}
373 for cfg_slice in self.fio_configs:
374 for section in cfg_slice:
375 sz = ssize2b(section.vals['size'])
376 msz = sz / (1024 ** 2)
377
378 if sz % (1024 ** 2) != 0:
379 msz += 1
380
381 fname = section.vals['filename']
382
383 # if already has other test with the same file name
384 # take largest size
385 files[fname] = max(files.get(fname, 0), msz)
386 else:
387 files = None
388 logger.warning("Prefilling of test files is disabled")
389
390 with ThreadPoolExecutor(len(self.config.nodes)) as pool:
391 fc = functools.partial(self.pre_run_th, files=files)
392 list(pool.map(fc, self.config.nodes))
393
394 def pre_run_th(self, node, files):
395 # fill files with pseudo-random data
396 rossh = run_on_node(node)
397
398 try:
399 cmd = 'mkdir -p "{0}"'.format(self.config.remote_dir)
400 if self.use_sudo:
401 cmd = "sudo " + cmd
402 cmd += " ; sudo chown {0} {1}".format(self.node.get_user(),
403 self.config.remote_dir)
404
405 rossh(cmd)
406 except Exception as exc:
407 msg = "Failed to create folder {0} on remote {1}. Error: {2!s}"
408 msg = msg.format(self.config.remote_dir, self.node.get_conn_id(), exc)
409 logger.exception(msg)
410 raise StopTestError(msg, exc)
411
412 if files is not None:
413 self.prefill_test_files(rossh, files)
414
415 self.install_utils(rossh)
416
417 def run(self):
418 if len(self.fio_configs) > 1:
419 # +10% - is a rough estimation for additional operations
420 # like sftp, etc
421 exec_time = int(sum(map(execution_time, self.fio_configs)) * 1.1)
422 exec_time_s = sec_to_str(exec_time)
423 now_dt = datetime.datetime.now()
424 end_dt = now_dt + datetime.timedelta(0, exec_time)
425 msg = "Entire test should takes aroud: {0} and finished at {1}"
426 logger.info(msg.format(exec_time_s,
427 end_dt.strftime("%H:%M:%S")))
428
429 tname = os.path.basename(self.config_fname)
430 if tname.endswith('.cfg'):
431 tname = tname[:-4]
432
433 barrier = Barrier(len(self.config.nodes))
434 results = []
435
436 with ThreadPoolExecutor(len(self.config.nodes)) as pool:
437 for pos, fio_cfg in enumerate(self.fio_configs):
438 logger.info("Will run {0} test".format(fio_cfg.name))
439
440 templ = "Test should takes about {0}." + \
441 " Should finish at {1}," + \
442 " will wait at most till {2}"
443 exec_time = execution_time(fio_cfg)
444 exec_time_str = sec_to_str(exec_time)
445 timeout = int(exec_time + max(300, exec_time))
446
447 now_dt = datetime.datetime.now()
448 end_dt = now_dt + datetime.timedelta(0, exec_time)
449 wait_till = now_dt + datetime.timedelta(0, timeout)
450
451 logger.info(templ.format(exec_time_str,
452 end_dt.strftime("%H:%M:%S"),
453 wait_till.strftime("%H:%M:%S")))
454
455 func = functools.partial(self.do_run,
456 barrier=barrier,
457 fio_cfg=fio_cfg,
458 pos=pos)
459
460 max_retr = 3
461 for idx in range(max_retr):
462 try:
463 intervals = list(pool.map(func, self.config.nodes))
464 break
465 except (EnvironmentError, SSHException) as exc:
466 logger.exception("During fio run")
467 if idx == max_retr - 1:
468 raise StopTestError("Fio failed", exc)
469
470 logger.info("Sleeping 30s and retrying")
471 time.sleep(30)
472
473 fname = "{0}_task.fio".format(pos)
474 with open(os.path.join(self.config.log_directory, fname), "w") as fd:
475 fd.write(str(fio_cfg))
476
477 params = {'vm_count': len(self.config.nodes)}
478 params['name'] = fio_cfg.name
479 params['vals'] = dict(fio_cfg.vals.items())
480 params['intervals'] = intervals
481 params['nodes'] = [node.get_conn_id() for node in self.config.nodes]
482
483 fname = "{0}_params.yaml".format(pos)
484 with open(os.path.join(self.config.log_directory, fname), "w") as fd:
485 fd.write(dumps(params))
486
487 res = load_test_results(self.config.log_directory, pos)
488 results.append(res)
489 return results
490
491 def do_run(self, node, barrier, fio_cfg, pos, nolog=False):
492 exec_folder = os.path.dirname(self.task_file)
493 bash_file = "#!/bin/bash\n" + \
494 "cd {exec_folder}\n" + \
495 "fio --output-format=json --output={out_file} " + \
496 "--alloc-size=262144 {job_file} " + \
497 " >{err_out_file} 2>&1 \n" + \
498 "echo $? >{res_code_file}\n"
499
500 bash_file = bash_file.format(out_file=self.results_file,
501 job_file=self.task_file,
502 err_out_file=self.err_out_file,
503 res_code_file=self.exit_code_file,
504 exec_folder=exec_folder)
505
koder aka kdanilov5414a992015-06-13 03:07:25 +0300506 run_on_node(node)("mkdir -p {0}".format(exec_folder), nolog=True)
507
koder aka kdanilov89fb6102015-06-13 02:58:08 +0300508 assert exec_folder != "" and exec_folder != "/"
509 run_on_node(node)("rm -rf {0}/*".format(exec_folder), nolog=True)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300510
511 with node.connection.open_sftp() as sftp:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300512 save_to_remote(sftp, self.task_file, str(fio_cfg))
513 save_to_remote(sftp, self.sh_file, bash_file)
514
515 exec_time = execution_time(fio_cfg)
516
517 timeout = int(exec_time + max(300, exec_time))
518 soft_tout = exec_time
519
520 begin = time.time()
521
koder aka kdanilov5414a992015-06-13 03:07:25 +0300522 if self.use_sudo:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300523 sudo = "sudo "
524 else:
525 sudo = ""
526
527 fnames_before = run_on_node(node)("ls -1 " + exec_folder, nolog=True)
528
529 barrier.wait()
530
koder aka kdanilov5414a992015-06-13 03:07:25 +0300531 task = BGSSHTask(node, self.use_sudo)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300532 task.start(sudo + "bash " + self.sh_file)
533
534 while True:
535 try:
536 task.wait(soft_tout, timeout)
537 break
538 except paramiko.SSHException:
539 pass
540
541 try:
542 node.connection.close()
543 except:
544 pass
545
546 reconnect(node.connection, node.conn_url)
547
548 end = time.time()
549 rossh = run_on_node(node)
550 fnames_after = rossh("ls -1 " + exec_folder, nolog=True)
551
552 conn_id = node.get_conn_id().replace(":", "_")
553 if not nolog:
554 logger.debug("Test on node {0} is finished".format(conn_id))
555
556 log_files_pref = []
557 if 'write_lat_log' in fio_cfg.vals:
558 fname = fio_cfg.vals['write_lat_log']
559 log_files_pref.append(fname + '_clat')
560 log_files_pref.append(fname + '_lat')
561 log_files_pref.append(fname + '_slat')
562
563 if 'write_iops_log' in fio_cfg.vals:
564 fname = fio_cfg.vals['write_iops_log']
565 log_files_pref.append(fname + '_iops')
566
567 if 'write_bw_log' in fio_cfg.vals:
568 fname = fio_cfg.vals['write_bw_log']
569 log_files_pref.append(fname + '_bw')
570
571 files = collections.defaultdict(lambda: [])
572 all_files = [os.path.basename(self.results_file)]
573 new_files = set(fnames_after.split()) - set(fnames_before.split())
574 for fname in new_files:
575 if fname.endswith('.log') and fname.split('.')[0] in log_files_pref:
576 name, _ = os.path.splitext(fname)
577 if fname.count('.') == 1:
578 tp = name.split("_")[-1]
579 cnt = 0
580 else:
581 tp_cnt = name.split("_")[-1]
582 tp, cnt = tp_cnt.split('.')
583 files[tp].append((int(cnt), fname))
584 all_files.append(fname)
585
586 arch_name = self.join_remote('wally_result.tar.gz')
587 tmp_dir = os.path.join(self.config.log_directory, 'tmp_' + conn_id)
588 os.mkdir(tmp_dir)
589 loc_arch_name = os.path.join(tmp_dir, 'wally_result.{0}.tar.gz'.format(conn_id))
590 file_full_names = " ".join(all_files)
591
592 try:
593 os.unlink(loc_arch_name)
594 except:
595 pass
596
597 with node.connection.open_sftp() as sftp:
598 exit_code = read_from_remote(sftp, self.exit_code_file)
599 err_out = read_from_remote(sftp, self.err_out_file)
600 exit_code = exit_code.strip()
601
602 if exit_code != '0':
603 msg = "fio exit with code {0}: {1}".format(exit_code, err_out)
604 logger.critical(msg.strip())
605 raise StopTestError("fio failed")
606
607 rossh("rm -f {0}".format(arch_name), nolog=True)
608 cmd = "cd {0} ; tar zcvf {1} {2}".format(exec_folder, arch_name, file_full_names)
609 rossh(cmd, nolog=True)
610 sftp.get(arch_name, loc_arch_name)
611
612 cmd = "cd {0} ; tar xvzf {1} >/dev/null".format(tmp_dir, loc_arch_name)
613 subprocess.check_call(cmd, shell=True)
614 os.unlink(loc_arch_name)
615
616 for ftype, fls in files.items():
617 for idx, fname in fls:
618 cname = os.path.join(tmp_dir, fname)
619 loc_fname = "{0}_{1}_{2}.{3}.log".format(pos, conn_id, ftype, idx)
620 loc_path = os.path.join(self.config.log_directory, loc_fname)
621 os.rename(cname, loc_path)
622
623 cname = os.path.join(tmp_dir,
624 os.path.basename(self.results_file))
625 loc_fname = "{0}_{1}_rawres.json".format(pos, conn_id)
626 loc_path = os.path.join(self.config.log_directory, loc_fname)
627 os.rename(cname, loc_path)
628
629 os.rmdir(tmp_dir)
630 return begin, end
631
632 @classmethod
633 def format_for_console(cls, data, dinfo):
634 """
635 create a table with io performance report
636 for console
637 """
638
639 def getconc(data):
640 th_count = data.params['vals'].get('numjobs')
641
642 if th_count is None:
643 th_count = data.params['vals'].get('concurence', 1)
644 return th_count
645
646 def key_func(data):
647 p = data.params['vals']
648
649 th_count = getconc(data)
650
651 return (data.name.rsplit("_", 1)[0],
652 p['rw'],
653 get_test_sync_mode(data.params),
654 ssize2b(p['blocksize']),
655 int(th_count) * data.testnodes_count)
656
657 tab = texttable.Texttable(max_width=120)
658 tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
659 tab.set_cols_align(["l", "l", "r", "r", "r", "r", "r", "r", "r"])
660
661 items = sorted(dinfo.values(), key=key_func)
662
663 prev_k = None
664 header = ["Name", "Description", "iops\ncum", "KiBps\ncum",
665 "Cnf\n95%", "Dev%", "iops\nper vm", "KiBps\nper vm", "lat\nms"]
666
667 for data in items:
668 curr_k = key_func(data)[:4]
669
670 if prev_k is not None:
671 if prev_k != curr_k:
672 tab.add_row(
673 ["-------", "-----------", "-----", "------",
674 "---", "----", "------", "---", "-----"])
675
676 prev_k = curr_k
677
678 test_dinfo = dinfo[(data.name, data.summary)]
679
680 iops, _ = test_dinfo.iops.rounded_average_conf()
681
682 bw, bw_conf = test_dinfo.bw.rounded_average_conf()
683 _, bw_dev = test_dinfo.bw.rounded_average_dev()
684 conf_perc = int(round(bw_conf * 100 / bw))
685 dev_perc = int(round(bw_dev * 100 / bw))
686
687 lat, _ = test_dinfo.lat.rounded_average_conf()
688 lat = round_3_digit(int(lat) // 1000)
689
690 iops_per_vm = round_3_digit(iops / data.testnodes_count)
691 bw_per_vm = round_3_digit(bw / data.testnodes_count)
692
693 iops = round_3_digit(iops)
694 bw = round_3_digit(bw)
695
696 params = (data.name.rsplit('_', 1)[0],
697 data.summary, int(iops), int(bw), str(conf_perc),
698 str(dev_perc),
699 int(iops_per_vm), int(bw_per_vm), lat)
700 tab.add_row(params)
701
702 tab.header(header)
703
704 return tab.draw()