blob: 670f42b22939b8d955b625ce267288d794a530a8 [file] [log] [blame]
koder aka kdanilovbc2c8982015-06-13 02:50:43 +03001import re
2import time
3import json
4import os.path
5import logging
6import datetime
7import functools
8import subprocess
9import collections
10
11import yaml
12import paramiko
13import texttable
14from paramiko.ssh_exception import SSHException
15from concurrent.futures import ThreadPoolExecutor
16
17from wally.pretty_yaml import dumps
18from wally.statistic import round_3_digit, data_property
19from wally.utils import ssize2b, sec_to_str, StopTestError, Barrier, get_os
20from wally.ssh_utils import (save_to_remote, read_from_remote, BGSSHTask, reconnect)
21
22from .fio_task_parser import (execution_time, fio_cfg_compile,
23 get_test_summary, get_test_sync_mode)
24from ..itest import TimeSeriesValue, PerfTest, TestResults, run_on_node
25
26logger = logging.getLogger("wally")
27
28
29# Results folder structure
30# results/
31# {loadtype}_{num}/
32# config.yaml
33# ......
34
35
36class NoData(object):
37 pass
38
39
40def cached_prop(func):
41 @property
42 @functools.wraps(func)
43 def closure(self):
44 val = getattr(self, "_" + func.__name__)
45 if val is NoData:
46 val = func(self)
47 setattr(self, "_" + func.__name__, val)
48 return val
49 return closure
50
51
52def load_fio_log_file(fname):
53 with open(fname) as fd:
54 it = [ln.split(',')[:2] for ln in fd]
55 vals = [(float(off) / 1000, float(val.strip())) for off, val in it]
56 return TimeSeriesValue(vals)
57
58
59def load_test_results(cls, folder, run_num):
60 res = {}
61 params = None
62
63 fn = os.path.join(folder, str(run_num) + '_params.yaml')
64 params = yaml.load(open(fn).read())
65
66 conn_ids = set()
67 for fname in os.listdir(folder):
68 rr = r"{0}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.\d+\.log$".format(run_num)
69 rm = re.match(rr, fname)
70 if rm is None:
71 continue
72
73 conn_id_s = rm.group('conn_id')
74 conn_id = conn_id_s.replace('_', ':')
75 ftype = rm.group('type')
76
77 if ftype not in ('iops', 'bw', 'lat'):
78 continue
79
80 try:
81 ts = load_fio_log_file(os.path.join(folder, fname))
82 if ftype in res:
83 assert conn_id not in res[ftype]
84
85 res.setdefault(ftype, {})[conn_id] = ts
86 except AssertionError:
87 pass
88
89 conn_ids.add(conn_id)
90
91 raw_res = {}
92 for conn_id in conn_ids:
93 fn = os.path.join(folder, "{0}_{1}_rawres.json".format(run_num, conn_id_s))
94
95 # remove message hack
96 fc = "{" + open(fn).read().split('{', 1)[1]
97 raw_res[conn_id] = json.loads(fc)
98
99 return cls(params, res, raw_res)
100
101
102class Attrmapper(object):
103 def __init__(self, dct):
104 self.__dct = dct
105
106 def __getattr__(self, name):
107 try:
108 return self.__dct[name]
109 except KeyError:
110 raise AttributeError(name)
111
112
113class DiskPerfInfo(object):
114 def __init__(self, name, summary, params, testnodes_count):
115 self.name = name
116 self.bw = None
117 self.iops = None
118 self.lat = None
119 self.lat_50 = None
120 self.lat_95 = None
121
122 self.raw_bw = []
123 self.raw_iops = []
124 self.raw_lat = []
125
126 self.params = params
127 self.testnodes_count = testnodes_count
128 self.summary = summary
129 self.p = Attrmapper(self.params['vals'])
130
131 self.sync_mode = get_test_sync_mode(self.params['vals'])
132 self.concurence = self.params['vals'].get('numjobs', 1)
133
134
135def get_lat_perc_50_95(lat_mks):
136 curr_perc = 0
137 perc_50 = None
138 perc_95 = None
139 pkey = None
140 for key, val in sorted(lat_mks.items()):
141 if curr_perc + val >= 50 and perc_50 is None:
142 if pkey is None or val < 1.:
143 perc_50 = key
144 else:
145 perc_50 = (50. - curr_perc) / val * (key - pkey) + pkey
146
147 if curr_perc + val >= 95:
148 if pkey is None or val < 1.:
149 perc_95 = key
150 else:
151 perc_95 = (95. - curr_perc) / val * (key - pkey) + pkey
152 break
153
154 pkey = key
155 curr_perc += val
156
157 return perc_50 / 1000., perc_95 / 1000.
158
159
160def prepare(ramp_time, data, avg_interval):
161 if data is None:
162 return data
163
164 res = {}
165 for key, ts_data in data.items():
166 if ramp_time > 0:
167 ts_data = ts_data.skip(ramp_time)
168
169 res[key] = ts_data.derived(avg_interval)
170 return res
171
172
173class IOTestResult(TestResults):
174 """
175 Fio run results
176 config: TestConfig
177 fio_task: FioJobSection
178 ts_results: {str: TimeSeriesValue}
179 raw_result: ????
180 run_interval:(float, float) - test tun time, used for sensors
181 """
182 def __init__(self, config, fio_task, ts_results, raw_result, run_interval):
183
184 self.name = fio_task.name.split("_")[0]
185 self.fio_task = fio_task
186
187 ramp_time = fio_task.vals.get('ramp_time', 0)
188
189 self.bw = prepare(ramp_time, ts_results.get('bw'), 1.0)
190 self.lat = prepare(ramp_time, ts_results.get('lat'), 1.0)
191 self.iops = prepare(ramp_time, ts_results.get('iops'), 1.0)
192 # self.slat = drop_warmup(res.get('clat', None), self.params)
193 # self.clat = drop_warmup(res.get('slat', None), self.params)
194
195 res = {"bw": self.bw, "lat": self.lat, "iops": self.iops}
196
197 self.sensors_data = None
198 self._pinfo = None
199 TestResults.__init__(self, config, res, raw_result, run_interval)
200
201 def summary(self):
202 return get_test_summary(self.fio_task) + "vm" \
203 + str(len(self.config.nodes))
204
205 def get_yamable(self):
206 return self.summary()
207
208 @property
209 def disk_perf_info(self):
210 if self._pinfo is not None:
211 return self._pinfo
212
213 lat_mks = collections.defaultdict(lambda: 0)
214 num_res = 0
215
216 for _, result in self.raw_result.items():
217 num_res += len(result['jobs'])
218 for job_info in result['jobs']:
219 for k, v in job_info['latency_ms'].items():
220 if isinstance(k, basestring) and k.startswith('>='):
221 lat_mks[int(k[2:]) * 1000] += v
222 else:
223 lat_mks[int(k) * 1000] += v
224
225 for k, v in job_info['latency_us'].items():
226 lat_mks[int(k)] += v
227
228 for k, v in lat_mks.items():
229 lat_mks[k] = float(v) / num_res
230
231 testnodes_count = len(self.fio_raw_res)
232
233 pinfo = DiskPerfInfo(self.name,
234 self.summary(),
235 self.params,
236 testnodes_count)
237
238 pinfo.raw_bw = [res.vals() for res in self.bw.values()]
239 pinfo.raw_iops = [res.vals() for res in self.iops.values()]
240 pinfo.raw_lat = [res.vals() for res in self.lat.values()]
241
242 pinfo.bw = data_property(map(sum, zip(*pinfo.raw_bw)))
243 pinfo.iops = data_property(map(sum, zip(*pinfo.raw_iops)))
244 pinfo.lat = data_property(sum(pinfo.raw_lat, []))
245 pinfo.lat_50, pinfo.lat_95 = get_lat_perc_50_95(lat_mks)
246
247 self._pinfo = pinfo
248
249 return pinfo
250
251
252class IOPerfTest(PerfTest):
253 tcp_conn_timeout = 30
254 max_pig_timeout = 5
255 soft_runcycle = 5 * 60
256
257 def __init__(self, config):
258 PerfTest.__init__(self, config)
259
260 get = self.config.params.get
261 do_get = self.config.params.__getitem__
262
263 self.config_fname = do_get('cfg')
264
265 if '/' not in self.config_fname and '.' not in self.config_fname:
266 cfgs_dir = os.path.dirname(__file__)
267 self.config_fname = os.path.join(cfgs_dir,
268 self.config_fname + '.cfg')
269
270 self.alive_check_interval = get('alive_check_interval')
271 self.use_system_fio = get('use_system_fio', False)
272
273 self.config_params = get('params', {}).copy()
274
275 self.io_py_remote = self.join_remote("agent.py")
276 self.results_file = self.join_remote("results.json")
277 self.pid_file = self.join_remote("pid")
278 self.task_file = self.join_remote("task.cfg")
279 self.sh_file = self.join_remote("cmd.sh")
280 self.err_out_file = self.join_remote("fio_err_out")
281 self.exit_code_file = self.join_remote("exit_code")
282
283 self.use_sudo = get("use_sudo", True)
284 self.test_logging = get("test_logging", False)
285
286 self.raw_cfg = open(self.config_fname).read()
287 self.fio_configs = fio_cfg_compile(self.raw_cfg,
288 self.config_fname,
289 self.config_params,
290 split_on_names=self.test_logging)
291 self.fio_configs = list(self.fio_configs)
292
293 @classmethod
294 def load(cls, folder):
295 for fname in os.listdir(folder):
296 if re.match("\d+_params.yaml$", fname):
297 num = int(fname.split('_')[0])
298 yield load_test_results(IOTestResult, folder, num)
299
300 def cleanup(self):
301 # delete_file(conn, self.io_py_remote)
302 # Need to remove tempo files, used for testing
303 pass
304
305 def prefill_test_files(self, files, rossh):
306 cmd_templ = "fio --name=xxx --filename={0} --direct=1" + \
307 " --bs=4m --size={1}m --rw=write"
308
309 if self.use_sudo:
310 cmd_templ = "sudo " + cmd_templ
311
312 ssize = 0
313 stime = time.time()
314
315 for fname, curr_sz in files.items():
316 cmd = cmd_templ.format(fname, curr_sz)
317 ssize += curr_sz
318
319 rossh(cmd, timeout=curr_sz)
320
321 ddtime = time.time() - stime
322 if ddtime > 1E-3:
323 fill_bw = int(ssize / ddtime)
324 mess = "Initiall dd fill bw is {0} MiBps for this vm"
325 logger.info(mess.format(fill_bw))
326 return fill_bw
327
328 def install_utils(self, rossh, max_retry=3, timeout=5):
329 need_install = []
330 packs = [('screen', 'screen')]
331
332 if self.use_system_fio:
333 packs.append(('fio', 'fio'))
334 else:
335 # define OS and x32/x64
336 # copy appropriate fio
337 # add fio deps
338 pass
339
340 for bin_name, package in packs:
341 if bin_name is None:
342 need_install.append(package)
343 continue
344
345 try:
346 rossh('which ' + bin_name, nolog=True)
347 except OSError:
348 need_install.append(package)
349
350 if len(need_install) == 0:
351 return
352
353 if 'redhat' == get_os(rossh):
354 cmd = "sudo yum -y install " + " ".join(need_install)
355 else:
356 cmd = "sudo apt-get -y install " + " ".join(need_install)
357
358 for _ in range(max_retry):
359 try:
360 rossh(cmd)
361 break
362 except OSError as err:
363 time.sleep(timeout)
364 else:
365 raise OSError("Can't install - " + str(err))
366
367 def pre_run(self):
368 prefill = False
369 prefill = self.config.options.get('prefill_files', True)
370
371 if prefill:
372 files = {}
373 for cfg_slice in self.fio_configs:
374 for section in cfg_slice:
375 sz = ssize2b(section.vals['size'])
376 msz = sz / (1024 ** 2)
377
378 if sz % (1024 ** 2) != 0:
379 msz += 1
380
381 fname = section.vals['filename']
382
383 # if already has other test with the same file name
384 # take largest size
385 files[fname] = max(files.get(fname, 0), msz)
386 else:
387 files = None
388 logger.warning("Prefilling of test files is disabled")
389
390 with ThreadPoolExecutor(len(self.config.nodes)) as pool:
391 fc = functools.partial(self.pre_run_th, files=files)
392 list(pool.map(fc, self.config.nodes))
393
394 def pre_run_th(self, node, files):
395 # fill files with pseudo-random data
396 rossh = run_on_node(node)
397
398 try:
399 cmd = 'mkdir -p "{0}"'.format(self.config.remote_dir)
400 if self.use_sudo:
401 cmd = "sudo " + cmd
402 cmd += " ; sudo chown {0} {1}".format(self.node.get_user(),
403 self.config.remote_dir)
404
405 rossh(cmd)
406 except Exception as exc:
407 msg = "Failed to create folder {0} on remote {1}. Error: {2!s}"
408 msg = msg.format(self.config.remote_dir, self.node.get_conn_id(), exc)
409 logger.exception(msg)
410 raise StopTestError(msg, exc)
411
412 if files is not None:
413 self.prefill_test_files(rossh, files)
414
415 self.install_utils(rossh)
416
417 def run(self):
418 if len(self.fio_configs) > 1:
419 # +10% - is a rough estimation for additional operations
420 # like sftp, etc
421 exec_time = int(sum(map(execution_time, self.fio_configs)) * 1.1)
422 exec_time_s = sec_to_str(exec_time)
423 now_dt = datetime.datetime.now()
424 end_dt = now_dt + datetime.timedelta(0, exec_time)
425 msg = "Entire test should takes aroud: {0} and finished at {1}"
426 logger.info(msg.format(exec_time_s,
427 end_dt.strftime("%H:%M:%S")))
428
429 tname = os.path.basename(self.config_fname)
430 if tname.endswith('.cfg'):
431 tname = tname[:-4]
432
433 barrier = Barrier(len(self.config.nodes))
434 results = []
435
436 with ThreadPoolExecutor(len(self.config.nodes)) as pool:
437 for pos, fio_cfg in enumerate(self.fio_configs):
438 logger.info("Will run {0} test".format(fio_cfg.name))
439
440 templ = "Test should takes about {0}." + \
441 " Should finish at {1}," + \
442 " will wait at most till {2}"
443 exec_time = execution_time(fio_cfg)
444 exec_time_str = sec_to_str(exec_time)
445 timeout = int(exec_time + max(300, exec_time))
446
447 now_dt = datetime.datetime.now()
448 end_dt = now_dt + datetime.timedelta(0, exec_time)
449 wait_till = now_dt + datetime.timedelta(0, timeout)
450
451 logger.info(templ.format(exec_time_str,
452 end_dt.strftime("%H:%M:%S"),
453 wait_till.strftime("%H:%M:%S")))
454
455 func = functools.partial(self.do_run,
456 barrier=barrier,
457 fio_cfg=fio_cfg,
458 pos=pos)
459
460 max_retr = 3
461 for idx in range(max_retr):
462 try:
463 intervals = list(pool.map(func, self.config.nodes))
464 break
465 except (EnvironmentError, SSHException) as exc:
466 logger.exception("During fio run")
467 if idx == max_retr - 1:
468 raise StopTestError("Fio failed", exc)
469
470 logger.info("Sleeping 30s and retrying")
471 time.sleep(30)
472
473 fname = "{0}_task.fio".format(pos)
474 with open(os.path.join(self.config.log_directory, fname), "w") as fd:
475 fd.write(str(fio_cfg))
476
477 params = {'vm_count': len(self.config.nodes)}
478 params['name'] = fio_cfg.name
479 params['vals'] = dict(fio_cfg.vals.items())
480 params['intervals'] = intervals
481 params['nodes'] = [node.get_conn_id() for node in self.config.nodes]
482
483 fname = "{0}_params.yaml".format(pos)
484 with open(os.path.join(self.config.log_directory, fname), "w") as fd:
485 fd.write(dumps(params))
486
487 res = load_test_results(self.config.log_directory, pos)
488 results.append(res)
489 return results
490
491 def do_run(self, node, barrier, fio_cfg, pos, nolog=False):
492 exec_folder = os.path.dirname(self.task_file)
493 bash_file = "#!/bin/bash\n" + \
494 "cd {exec_folder}\n" + \
495 "fio --output-format=json --output={out_file} " + \
496 "--alloc-size=262144 {job_file} " + \
497 " >{err_out_file} 2>&1 \n" + \
498 "echo $? >{res_code_file}\n"
499
500 bash_file = bash_file.format(out_file=self.results_file,
501 job_file=self.task_file,
502 err_out_file=self.err_out_file,
503 res_code_file=self.exit_code_file,
504 exec_folder=exec_folder)
505
koder aka kdanilov89fb6102015-06-13 02:58:08 +0300506 assert exec_folder != "" and exec_folder != "/"
507 run_on_node(node)("rm -rf {0}/*".format(exec_folder), nolog=True)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300508
509 with node.connection.open_sftp() as sftp:
510 print ">>>>", self.task_file
511 save_to_remote(sftp, self.task_file, str(fio_cfg))
512 save_to_remote(sftp, self.sh_file, bash_file)
513
514 exec_time = execution_time(fio_cfg)
515
516 timeout = int(exec_time + max(300, exec_time))
517 soft_tout = exec_time
518
519 begin = time.time()
520
521 if self.config.options.get("use_sudo", True):
522 sudo = "sudo "
523 else:
524 sudo = ""
525
526 fnames_before = run_on_node(node)("ls -1 " + exec_folder, nolog=True)
527
528 barrier.wait()
529
530 task = BGSSHTask(node, self.config.options.get("use_sudo", True))
531 task.start(sudo + "bash " + self.sh_file)
532
533 while True:
534 try:
535 task.wait(soft_tout, timeout)
536 break
537 except paramiko.SSHException:
538 pass
539
540 try:
541 node.connection.close()
542 except:
543 pass
544
545 reconnect(node.connection, node.conn_url)
546
547 end = time.time()
548 rossh = run_on_node(node)
549 fnames_after = rossh("ls -1 " + exec_folder, nolog=True)
550
551 conn_id = node.get_conn_id().replace(":", "_")
552 if not nolog:
553 logger.debug("Test on node {0} is finished".format(conn_id))
554
555 log_files_pref = []
556 if 'write_lat_log' in fio_cfg.vals:
557 fname = fio_cfg.vals['write_lat_log']
558 log_files_pref.append(fname + '_clat')
559 log_files_pref.append(fname + '_lat')
560 log_files_pref.append(fname + '_slat')
561
562 if 'write_iops_log' in fio_cfg.vals:
563 fname = fio_cfg.vals['write_iops_log']
564 log_files_pref.append(fname + '_iops')
565
566 if 'write_bw_log' in fio_cfg.vals:
567 fname = fio_cfg.vals['write_bw_log']
568 log_files_pref.append(fname + '_bw')
569
570 files = collections.defaultdict(lambda: [])
571 all_files = [os.path.basename(self.results_file)]
572 new_files = set(fnames_after.split()) - set(fnames_before.split())
573 for fname in new_files:
574 if fname.endswith('.log') and fname.split('.')[0] in log_files_pref:
575 name, _ = os.path.splitext(fname)
576 if fname.count('.') == 1:
577 tp = name.split("_")[-1]
578 cnt = 0
579 else:
580 tp_cnt = name.split("_")[-1]
581 tp, cnt = tp_cnt.split('.')
582 files[tp].append((int(cnt), fname))
583 all_files.append(fname)
584
585 arch_name = self.join_remote('wally_result.tar.gz')
586 tmp_dir = os.path.join(self.config.log_directory, 'tmp_' + conn_id)
587 os.mkdir(tmp_dir)
588 loc_arch_name = os.path.join(tmp_dir, 'wally_result.{0}.tar.gz'.format(conn_id))
589 file_full_names = " ".join(all_files)
590
591 try:
592 os.unlink(loc_arch_name)
593 except:
594 pass
595
596 with node.connection.open_sftp() as sftp:
597 exit_code = read_from_remote(sftp, self.exit_code_file)
598 err_out = read_from_remote(sftp, self.err_out_file)
599 exit_code = exit_code.strip()
600
601 if exit_code != '0':
602 msg = "fio exit with code {0}: {1}".format(exit_code, err_out)
603 logger.critical(msg.strip())
604 raise StopTestError("fio failed")
605
606 rossh("rm -f {0}".format(arch_name), nolog=True)
607 cmd = "cd {0} ; tar zcvf {1} {2}".format(exec_folder, arch_name, file_full_names)
608 rossh(cmd, nolog=True)
609 sftp.get(arch_name, loc_arch_name)
610
611 cmd = "cd {0} ; tar xvzf {1} >/dev/null".format(tmp_dir, loc_arch_name)
612 subprocess.check_call(cmd, shell=True)
613 os.unlink(loc_arch_name)
614
615 for ftype, fls in files.items():
616 for idx, fname in fls:
617 cname = os.path.join(tmp_dir, fname)
618 loc_fname = "{0}_{1}_{2}.{3}.log".format(pos, conn_id, ftype, idx)
619 loc_path = os.path.join(self.config.log_directory, loc_fname)
620 os.rename(cname, loc_path)
621
622 cname = os.path.join(tmp_dir,
623 os.path.basename(self.results_file))
624 loc_fname = "{0}_{1}_rawres.json".format(pos, conn_id)
625 loc_path = os.path.join(self.config.log_directory, loc_fname)
626 os.rename(cname, loc_path)
627
628 os.rmdir(tmp_dir)
629 return begin, end
630
631 @classmethod
632 def format_for_console(cls, data, dinfo):
633 """
634 create a table with io performance report
635 for console
636 """
637
638 def getconc(data):
639 th_count = data.params['vals'].get('numjobs')
640
641 if th_count is None:
642 th_count = data.params['vals'].get('concurence', 1)
643 return th_count
644
645 def key_func(data):
646 p = data.params['vals']
647
648 th_count = getconc(data)
649
650 return (data.name.rsplit("_", 1)[0],
651 p['rw'],
652 get_test_sync_mode(data.params),
653 ssize2b(p['blocksize']),
654 int(th_count) * data.testnodes_count)
655
656 tab = texttable.Texttable(max_width=120)
657 tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
658 tab.set_cols_align(["l", "l", "r", "r", "r", "r", "r", "r", "r"])
659
660 items = sorted(dinfo.values(), key=key_func)
661
662 prev_k = None
663 header = ["Name", "Description", "iops\ncum", "KiBps\ncum",
664 "Cnf\n95%", "Dev%", "iops\nper vm", "KiBps\nper vm", "lat\nms"]
665
666 for data in items:
667 curr_k = key_func(data)[:4]
668
669 if prev_k is not None:
670 if prev_k != curr_k:
671 tab.add_row(
672 ["-------", "-----------", "-----", "------",
673 "---", "----", "------", "---", "-----"])
674
675 prev_k = curr_k
676
677 test_dinfo = dinfo[(data.name, data.summary)]
678
679 iops, _ = test_dinfo.iops.rounded_average_conf()
680
681 bw, bw_conf = test_dinfo.bw.rounded_average_conf()
682 _, bw_dev = test_dinfo.bw.rounded_average_dev()
683 conf_perc = int(round(bw_conf * 100 / bw))
684 dev_perc = int(round(bw_dev * 100 / bw))
685
686 lat, _ = test_dinfo.lat.rounded_average_conf()
687 lat = round_3_digit(int(lat) // 1000)
688
689 iops_per_vm = round_3_digit(iops / data.testnodes_count)
690 bw_per_vm = round_3_digit(bw / data.testnodes_count)
691
692 iops = round_3_digit(iops)
693 bw = round_3_digit(bw)
694
695 params = (data.name.rsplit('_', 1)[0],
696 data.summary, int(iops), int(bw), str(conf_perc),
697 str(dev_perc),
698 int(iops_per_vm), int(bw_per_vm), lat)
699 tab.add_row(params)
700
701 tab.header(header)
702
703 return tab.draw()