blob: f7fc9bc8dbb972c128b67002b689bc7062711128 [file] [log] [blame]
koder aka kdanilovbc2c8982015-06-13 02:50:43 +03001import re
2import time
3import json
4import os.path
5import logging
6import datetime
7import functools
8import subprocess
9import collections
10
11import yaml
12import paramiko
13import texttable
14from paramiko.ssh_exception import SSHException
15from concurrent.futures import ThreadPoolExecutor
16
17from wally.pretty_yaml import dumps
18from wally.statistic import round_3_digit, data_property
19from wally.utils import ssize2b, sec_to_str, StopTestError, Barrier, get_os
20from wally.ssh_utils import (save_to_remote, read_from_remote, BGSSHTask, reconnect)
21
22from .fio_task_parser import (execution_time, fio_cfg_compile,
23 get_test_summary, get_test_sync_mode)
24from ..itest import TimeSeriesValue, PerfTest, TestResults, run_on_node
25
26logger = logging.getLogger("wally")
27
28
29# Results folder structure
30# results/
31# {loadtype}_{num}/
32# config.yaml
33# ......
34
35
36class NoData(object):
37 pass
38
39
40def cached_prop(func):
41 @property
42 @functools.wraps(func)
43 def closure(self):
44 val = getattr(self, "_" + func.__name__)
45 if val is NoData:
46 val = func(self)
47 setattr(self, "_" + func.__name__, val)
48 return val
49 return closure
50
51
52def load_fio_log_file(fname):
53 with open(fname) as fd:
54 it = [ln.split(',')[:2] for ln in fd]
55 vals = [(float(off) / 1000, float(val.strip())) for off, val in it]
56 return TimeSeriesValue(vals)
57
58
59def load_test_results(cls, folder, run_num):
60 res = {}
61 params = None
62
63 fn = os.path.join(folder, str(run_num) + '_params.yaml')
64 params = yaml.load(open(fn).read())
65
66 conn_ids = set()
67 for fname in os.listdir(folder):
68 rr = r"{0}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.\d+\.log$".format(run_num)
69 rm = re.match(rr, fname)
70 if rm is None:
71 continue
72
73 conn_id_s = rm.group('conn_id')
74 conn_id = conn_id_s.replace('_', ':')
75 ftype = rm.group('type')
76
77 if ftype not in ('iops', 'bw', 'lat'):
78 continue
79
80 try:
81 ts = load_fio_log_file(os.path.join(folder, fname))
82 if ftype in res:
83 assert conn_id not in res[ftype]
84
85 res.setdefault(ftype, {})[conn_id] = ts
86 except AssertionError:
87 pass
88
89 conn_ids.add(conn_id)
90
91 raw_res = {}
92 for conn_id in conn_ids:
93 fn = os.path.join(folder, "{0}_{1}_rawres.json".format(run_num, conn_id_s))
94
95 # remove message hack
96 fc = "{" + open(fn).read().split('{', 1)[1]
97 raw_res[conn_id] = json.loads(fc)
98
99 return cls(params, res, raw_res)
100
101
102class Attrmapper(object):
103 def __init__(self, dct):
104 self.__dct = dct
105
106 def __getattr__(self, name):
107 try:
108 return self.__dct[name]
109 except KeyError:
110 raise AttributeError(name)
111
112
113class DiskPerfInfo(object):
114 def __init__(self, name, summary, params, testnodes_count):
115 self.name = name
116 self.bw = None
117 self.iops = None
118 self.lat = None
119 self.lat_50 = None
120 self.lat_95 = None
121
122 self.raw_bw = []
123 self.raw_iops = []
124 self.raw_lat = []
125
126 self.params = params
127 self.testnodes_count = testnodes_count
128 self.summary = summary
129 self.p = Attrmapper(self.params['vals'])
130
131 self.sync_mode = get_test_sync_mode(self.params['vals'])
132 self.concurence = self.params['vals'].get('numjobs', 1)
133
134
135def get_lat_perc_50_95(lat_mks):
136 curr_perc = 0
137 perc_50 = None
138 perc_95 = None
139 pkey = None
140 for key, val in sorted(lat_mks.items()):
141 if curr_perc + val >= 50 and perc_50 is None:
142 if pkey is None or val < 1.:
143 perc_50 = key
144 else:
145 perc_50 = (50. - curr_perc) / val * (key - pkey) + pkey
146
147 if curr_perc + val >= 95:
148 if pkey is None or val < 1.:
149 perc_95 = key
150 else:
151 perc_95 = (95. - curr_perc) / val * (key - pkey) + pkey
152 break
153
154 pkey = key
155 curr_perc += val
156
157 return perc_50 / 1000., perc_95 / 1000.
158
159
160def prepare(ramp_time, data, avg_interval):
161 if data is None:
162 return data
163
164 res = {}
165 for key, ts_data in data.items():
166 if ramp_time > 0:
167 ts_data = ts_data.skip(ramp_time)
168
169 res[key] = ts_data.derived(avg_interval)
170 return res
171
172
173class IOTestResult(TestResults):
174 """
175 Fio run results
176 config: TestConfig
177 fio_task: FioJobSection
178 ts_results: {str: TimeSeriesValue}
179 raw_result: ????
180 run_interval:(float, float) - test tun time, used for sensors
181 """
182 def __init__(self, config, fio_task, ts_results, raw_result, run_interval):
183
184 self.name = fio_task.name.split("_")[0]
185 self.fio_task = fio_task
186
187 ramp_time = fio_task.vals.get('ramp_time', 0)
188
189 self.bw = prepare(ramp_time, ts_results.get('bw'), 1.0)
190 self.lat = prepare(ramp_time, ts_results.get('lat'), 1.0)
191 self.iops = prepare(ramp_time, ts_results.get('iops'), 1.0)
192 # self.slat = drop_warmup(res.get('clat', None), self.params)
193 # self.clat = drop_warmup(res.get('slat', None), self.params)
194
195 res = {"bw": self.bw, "lat": self.lat, "iops": self.iops}
196
197 self.sensors_data = None
198 self._pinfo = None
199 TestResults.__init__(self, config, res, raw_result, run_interval)
200
201 def summary(self):
202 return get_test_summary(self.fio_task) + "vm" \
203 + str(len(self.config.nodes))
204
205 def get_yamable(self):
206 return self.summary()
207
208 @property
209 def disk_perf_info(self):
210 if self._pinfo is not None:
211 return self._pinfo
212
213 lat_mks = collections.defaultdict(lambda: 0)
214 num_res = 0
215
216 for _, result in self.raw_result.items():
217 num_res += len(result['jobs'])
218 for job_info in result['jobs']:
219 for k, v in job_info['latency_ms'].items():
220 if isinstance(k, basestring) and k.startswith('>='):
221 lat_mks[int(k[2:]) * 1000] += v
222 else:
223 lat_mks[int(k) * 1000] += v
224
225 for k, v in job_info['latency_us'].items():
226 lat_mks[int(k)] += v
227
228 for k, v in lat_mks.items():
229 lat_mks[k] = float(v) / num_res
230
231 testnodes_count = len(self.fio_raw_res)
232
233 pinfo = DiskPerfInfo(self.name,
234 self.summary(),
235 self.params,
236 testnodes_count)
237
238 pinfo.raw_bw = [res.vals() for res in self.bw.values()]
239 pinfo.raw_iops = [res.vals() for res in self.iops.values()]
240 pinfo.raw_lat = [res.vals() for res in self.lat.values()]
241
242 pinfo.bw = data_property(map(sum, zip(*pinfo.raw_bw)))
243 pinfo.iops = data_property(map(sum, zip(*pinfo.raw_iops)))
244 pinfo.lat = data_property(sum(pinfo.raw_lat, []))
245 pinfo.lat_50, pinfo.lat_95 = get_lat_perc_50_95(lat_mks)
246
247 self._pinfo = pinfo
248
249 return pinfo
250
251
252class IOPerfTest(PerfTest):
253 tcp_conn_timeout = 30
254 max_pig_timeout = 5
255 soft_runcycle = 5 * 60
256
257 def __init__(self, config):
258 PerfTest.__init__(self, config)
259
260 get = self.config.params.get
261 do_get = self.config.params.__getitem__
262
263 self.config_fname = do_get('cfg')
264
265 if '/' not in self.config_fname and '.' not in self.config_fname:
266 cfgs_dir = os.path.dirname(__file__)
267 self.config_fname = os.path.join(cfgs_dir,
268 self.config_fname + '.cfg')
269
270 self.alive_check_interval = get('alive_check_interval')
271 self.use_system_fio = get('use_system_fio', False)
272
273 self.config_params = get('params', {}).copy()
274
275 self.io_py_remote = self.join_remote("agent.py")
276 self.results_file = self.join_remote("results.json")
277 self.pid_file = self.join_remote("pid")
278 self.task_file = self.join_remote("task.cfg")
279 self.sh_file = self.join_remote("cmd.sh")
280 self.err_out_file = self.join_remote("fio_err_out")
281 self.exit_code_file = self.join_remote("exit_code")
282
283 self.use_sudo = get("use_sudo", True)
284 self.test_logging = get("test_logging", False)
285
286 self.raw_cfg = open(self.config_fname).read()
287 self.fio_configs = fio_cfg_compile(self.raw_cfg,
288 self.config_fname,
289 self.config_params,
290 split_on_names=self.test_logging)
291 self.fio_configs = list(self.fio_configs)
292
293 @classmethod
294 def load(cls, folder):
295 for fname in os.listdir(folder):
296 if re.match("\d+_params.yaml$", fname):
297 num = int(fname.split('_')[0])
298 yield load_test_results(IOTestResult, folder, num)
299
300 def cleanup(self):
301 # delete_file(conn, self.io_py_remote)
302 # Need to remove tempo files, used for testing
303 pass
304
305 def prefill_test_files(self, files, rossh):
306 cmd_templ = "fio --name=xxx --filename={0} --direct=1" + \
307 " --bs=4m --size={1}m --rw=write"
308
309 if self.use_sudo:
310 cmd_templ = "sudo " + cmd_templ
311
312 ssize = 0
313 stime = time.time()
314
315 for fname, curr_sz in files.items():
316 cmd = cmd_templ.format(fname, curr_sz)
317 ssize += curr_sz
318
319 rossh(cmd, timeout=curr_sz)
320
321 ddtime = time.time() - stime
322 if ddtime > 1E-3:
323 fill_bw = int(ssize / ddtime)
324 mess = "Initiall dd fill bw is {0} MiBps for this vm"
325 logger.info(mess.format(fill_bw))
326 return fill_bw
327
328 def install_utils(self, rossh, max_retry=3, timeout=5):
329 need_install = []
330 packs = [('screen', 'screen')]
331
332 if self.use_system_fio:
333 packs.append(('fio', 'fio'))
334 else:
335 # define OS and x32/x64
336 # copy appropriate fio
337 # add fio deps
338 pass
339
340 for bin_name, package in packs:
341 if bin_name is None:
342 need_install.append(package)
343 continue
344
345 try:
346 rossh('which ' + bin_name, nolog=True)
347 except OSError:
348 need_install.append(package)
349
350 if len(need_install) == 0:
351 return
352
353 if 'redhat' == get_os(rossh):
354 cmd = "sudo yum -y install " + " ".join(need_install)
355 else:
356 cmd = "sudo apt-get -y install " + " ".join(need_install)
357
358 for _ in range(max_retry):
359 try:
360 rossh(cmd)
361 break
362 except OSError as err:
363 time.sleep(timeout)
364 else:
365 raise OSError("Can't install - " + str(err))
366
367 def pre_run(self):
368 prefill = False
369 prefill = self.config.options.get('prefill_files', True)
370
371 if prefill:
372 files = {}
373 for cfg_slice in self.fio_configs:
374 for section in cfg_slice:
375 sz = ssize2b(section.vals['size'])
376 msz = sz / (1024 ** 2)
377
378 if sz % (1024 ** 2) != 0:
379 msz += 1
380
381 fname = section.vals['filename']
382
383 # if already has other test with the same file name
384 # take largest size
385 files[fname] = max(files.get(fname, 0), msz)
386 else:
387 files = None
388 logger.warning("Prefilling of test files is disabled")
389
390 with ThreadPoolExecutor(len(self.config.nodes)) as pool:
391 fc = functools.partial(self.pre_run_th, files=files)
392 list(pool.map(fc, self.config.nodes))
393
394 def pre_run_th(self, node, files):
395 # fill files with pseudo-random data
396 rossh = run_on_node(node)
397
398 try:
399 cmd = 'mkdir -p "{0}"'.format(self.config.remote_dir)
400 if self.use_sudo:
401 cmd = "sudo " + cmd
402 cmd += " ; sudo chown {0} {1}".format(self.node.get_user(),
403 self.config.remote_dir)
404
405 rossh(cmd)
406 except Exception as exc:
407 msg = "Failed to create folder {0} on remote {1}. Error: {2!s}"
408 msg = msg.format(self.config.remote_dir, self.node.get_conn_id(), exc)
409 logger.exception(msg)
410 raise StopTestError(msg, exc)
411
412 if files is not None:
413 self.prefill_test_files(rossh, files)
414
415 self.install_utils(rossh)
416
417 def run(self):
418 if len(self.fio_configs) > 1:
419 # +10% - is a rough estimation for additional operations
420 # like sftp, etc
421 exec_time = int(sum(map(execution_time, self.fio_configs)) * 1.1)
422 exec_time_s = sec_to_str(exec_time)
423 now_dt = datetime.datetime.now()
424 end_dt = now_dt + datetime.timedelta(0, exec_time)
425 msg = "Entire test should takes aroud: {0} and finished at {1}"
426 logger.info(msg.format(exec_time_s,
427 end_dt.strftime("%H:%M:%S")))
428
429 tname = os.path.basename(self.config_fname)
430 if tname.endswith('.cfg'):
431 tname = tname[:-4]
432
433 barrier = Barrier(len(self.config.nodes))
434 results = []
435
436 with ThreadPoolExecutor(len(self.config.nodes)) as pool:
437 for pos, fio_cfg in enumerate(self.fio_configs):
438 logger.info("Will run {0} test".format(fio_cfg.name))
439
440 templ = "Test should takes about {0}." + \
441 " Should finish at {1}," + \
442 " will wait at most till {2}"
443 exec_time = execution_time(fio_cfg)
444 exec_time_str = sec_to_str(exec_time)
445 timeout = int(exec_time + max(300, exec_time))
446
447 now_dt = datetime.datetime.now()
448 end_dt = now_dt + datetime.timedelta(0, exec_time)
449 wait_till = now_dt + datetime.timedelta(0, timeout)
450
451 logger.info(templ.format(exec_time_str,
452 end_dt.strftime("%H:%M:%S"),
453 wait_till.strftime("%H:%M:%S")))
454
455 func = functools.partial(self.do_run,
456 barrier=barrier,
457 fio_cfg=fio_cfg,
458 pos=pos)
459
460 max_retr = 3
461 for idx in range(max_retr):
462 try:
463 intervals = list(pool.map(func, self.config.nodes))
464 break
465 except (EnvironmentError, SSHException) as exc:
466 logger.exception("During fio run")
467 if idx == max_retr - 1:
468 raise StopTestError("Fio failed", exc)
469
470 logger.info("Sleeping 30s and retrying")
471 time.sleep(30)
472
473 fname = "{0}_task.fio".format(pos)
474 with open(os.path.join(self.config.log_directory, fname), "w") as fd:
475 fd.write(str(fio_cfg))
476
477 params = {'vm_count': len(self.config.nodes)}
478 params['name'] = fio_cfg.name
479 params['vals'] = dict(fio_cfg.vals.items())
480 params['intervals'] = intervals
481 params['nodes'] = [node.get_conn_id() for node in self.config.nodes]
482
483 fname = "{0}_params.yaml".format(pos)
484 with open(os.path.join(self.config.log_directory, fname), "w") as fd:
485 fd.write(dumps(params))
486
487 res = load_test_results(self.config.log_directory, pos)
488 results.append(res)
489 return results
490
491 def do_run(self, node, barrier, fio_cfg, pos, nolog=False):
492 exec_folder = os.path.dirname(self.task_file)
493 bash_file = "#!/bin/bash\n" + \
494 "cd {exec_folder}\n" + \
495 "fio --output-format=json --output={out_file} " + \
496 "--alloc-size=262144 {job_file} " + \
497 " >{err_out_file} 2>&1 \n" + \
498 "echo $? >{res_code_file}\n"
499
500 bash_file = bash_file.format(out_file=self.results_file,
501 job_file=self.task_file,
502 err_out_file=self.err_out_file,
503 res_code_file=self.exit_code_file,
504 exec_folder=exec_folder)
505
506 run_on_node(node)("cd {0} ; rm -rf *".format(exec_folder), nolog=True)
507
508 with node.connection.open_sftp() as sftp:
509 print ">>>>", self.task_file
510 save_to_remote(sftp, self.task_file, str(fio_cfg))
511 save_to_remote(sftp, self.sh_file, bash_file)
512
513 exec_time = execution_time(fio_cfg)
514
515 timeout = int(exec_time + max(300, exec_time))
516 soft_tout = exec_time
517
518 begin = time.time()
519
520 if self.config.options.get("use_sudo", True):
521 sudo = "sudo "
522 else:
523 sudo = ""
524
525 fnames_before = run_on_node(node)("ls -1 " + exec_folder, nolog=True)
526
527 barrier.wait()
528
529 task = BGSSHTask(node, self.config.options.get("use_sudo", True))
530 task.start(sudo + "bash " + self.sh_file)
531
532 while True:
533 try:
534 task.wait(soft_tout, timeout)
535 break
536 except paramiko.SSHException:
537 pass
538
539 try:
540 node.connection.close()
541 except:
542 pass
543
544 reconnect(node.connection, node.conn_url)
545
546 end = time.time()
547 rossh = run_on_node(node)
548 fnames_after = rossh("ls -1 " + exec_folder, nolog=True)
549
550 conn_id = node.get_conn_id().replace(":", "_")
551 if not nolog:
552 logger.debug("Test on node {0} is finished".format(conn_id))
553
554 log_files_pref = []
555 if 'write_lat_log' in fio_cfg.vals:
556 fname = fio_cfg.vals['write_lat_log']
557 log_files_pref.append(fname + '_clat')
558 log_files_pref.append(fname + '_lat')
559 log_files_pref.append(fname + '_slat')
560
561 if 'write_iops_log' in fio_cfg.vals:
562 fname = fio_cfg.vals['write_iops_log']
563 log_files_pref.append(fname + '_iops')
564
565 if 'write_bw_log' in fio_cfg.vals:
566 fname = fio_cfg.vals['write_bw_log']
567 log_files_pref.append(fname + '_bw')
568
569 files = collections.defaultdict(lambda: [])
570 all_files = [os.path.basename(self.results_file)]
571 new_files = set(fnames_after.split()) - set(fnames_before.split())
572 for fname in new_files:
573 if fname.endswith('.log') and fname.split('.')[0] in log_files_pref:
574 name, _ = os.path.splitext(fname)
575 if fname.count('.') == 1:
576 tp = name.split("_")[-1]
577 cnt = 0
578 else:
579 tp_cnt = name.split("_")[-1]
580 tp, cnt = tp_cnt.split('.')
581 files[tp].append((int(cnt), fname))
582 all_files.append(fname)
583
584 arch_name = self.join_remote('wally_result.tar.gz')
585 tmp_dir = os.path.join(self.config.log_directory, 'tmp_' + conn_id)
586 os.mkdir(tmp_dir)
587 loc_arch_name = os.path.join(tmp_dir, 'wally_result.{0}.tar.gz'.format(conn_id))
588 file_full_names = " ".join(all_files)
589
590 try:
591 os.unlink(loc_arch_name)
592 except:
593 pass
594
595 with node.connection.open_sftp() as sftp:
596 exit_code = read_from_remote(sftp, self.exit_code_file)
597 err_out = read_from_remote(sftp, self.err_out_file)
598 exit_code = exit_code.strip()
599
600 if exit_code != '0':
601 msg = "fio exit with code {0}: {1}".format(exit_code, err_out)
602 logger.critical(msg.strip())
603 raise StopTestError("fio failed")
604
605 rossh("rm -f {0}".format(arch_name), nolog=True)
606 cmd = "cd {0} ; tar zcvf {1} {2}".format(exec_folder, arch_name, file_full_names)
607 rossh(cmd, nolog=True)
608 sftp.get(arch_name, loc_arch_name)
609
610 cmd = "cd {0} ; tar xvzf {1} >/dev/null".format(tmp_dir, loc_arch_name)
611 subprocess.check_call(cmd, shell=True)
612 os.unlink(loc_arch_name)
613
614 for ftype, fls in files.items():
615 for idx, fname in fls:
616 cname = os.path.join(tmp_dir, fname)
617 loc_fname = "{0}_{1}_{2}.{3}.log".format(pos, conn_id, ftype, idx)
618 loc_path = os.path.join(self.config.log_directory, loc_fname)
619 os.rename(cname, loc_path)
620
621 cname = os.path.join(tmp_dir,
622 os.path.basename(self.results_file))
623 loc_fname = "{0}_{1}_rawres.json".format(pos, conn_id)
624 loc_path = os.path.join(self.config.log_directory, loc_fname)
625 os.rename(cname, loc_path)
626
627 os.rmdir(tmp_dir)
628 return begin, end
629
630 @classmethod
631 def format_for_console(cls, data, dinfo):
632 """
633 create a table with io performance report
634 for console
635 """
636
637 def getconc(data):
638 th_count = data.params['vals'].get('numjobs')
639
640 if th_count is None:
641 th_count = data.params['vals'].get('concurence', 1)
642 return th_count
643
644 def key_func(data):
645 p = data.params['vals']
646
647 th_count = getconc(data)
648
649 return (data.name.rsplit("_", 1)[0],
650 p['rw'],
651 get_test_sync_mode(data.params),
652 ssize2b(p['blocksize']),
653 int(th_count) * data.testnodes_count)
654
655 tab = texttable.Texttable(max_width=120)
656 tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
657 tab.set_cols_align(["l", "l", "r", "r", "r", "r", "r", "r", "r"])
658
659 items = sorted(dinfo.values(), key=key_func)
660
661 prev_k = None
662 header = ["Name", "Description", "iops\ncum", "KiBps\ncum",
663 "Cnf\n95%", "Dev%", "iops\nper vm", "KiBps\nper vm", "lat\nms"]
664
665 for data in items:
666 curr_k = key_func(data)[:4]
667
668 if prev_k is not None:
669 if prev_k != curr_k:
670 tab.add_row(
671 ["-------", "-----------", "-----", "------",
672 "---", "----", "------", "---", "-----"])
673
674 prev_k = curr_k
675
676 test_dinfo = dinfo[(data.name, data.summary)]
677
678 iops, _ = test_dinfo.iops.rounded_average_conf()
679
680 bw, bw_conf = test_dinfo.bw.rounded_average_conf()
681 _, bw_dev = test_dinfo.bw.rounded_average_dev()
682 conf_perc = int(round(bw_conf * 100 / bw))
683 dev_perc = int(round(bw_dev * 100 / bw))
684
685 lat, _ = test_dinfo.lat.rounded_average_conf()
686 lat = round_3_digit(int(lat) // 1000)
687
688 iops_per_vm = round_3_digit(iops / data.testnodes_count)
689 bw_per_vm = round_3_digit(bw / data.testnodes_count)
690
691 iops = round_3_digit(iops)
692 bw = round_3_digit(bw)
693
694 params = (data.name.rsplit('_', 1)[0],
695 data.summary, int(iops), int(bw), str(conf_perc),
696 str(dev_perc),
697 int(iops_per_vm), int(bw_per_vm), lat)
698 tab.add_row(params)
699
700 tab.header(header)
701
702 return tab.draw()