blob: 2a76fc23eaf3771f577d67bb8b3f542a62962677 [file] [log] [blame]
koder aka kdanilovbc2c8982015-06-13 02:50:43 +03001import re
2import time
3import json
4import os.path
5import logging
6import datetime
7import functools
8import subprocess
9import collections
10
11import yaml
12import paramiko
13import texttable
14from paramiko.ssh_exception import SSHException
15from concurrent.futures import ThreadPoolExecutor
16
17from wally.pretty_yaml import dumps
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +030018from wally.statistic import round_3_digit, data_property, average
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030019from wally.utils import ssize2b, sec_to_str, StopTestError, Barrier, get_os
20from wally.ssh_utils import (save_to_remote, read_from_remote, BGSSHTask, reconnect)
21
22from .fio_task_parser import (execution_time, fio_cfg_compile,
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +030023 get_test_summary, get_test_sync_mode, FioJobSection)
24from ..itest import (TimeSeriesValue, PerfTest, TestResults,
25 run_on_node, TestConfig, MeasurementMatrix)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030026
27logger = logging.getLogger("wally")
28
29
30# Results folder structure
31# results/
32# {loadtype}_{num}/
33# config.yaml
34# ......
35
36
37class NoData(object):
38 pass
39
40
41def cached_prop(func):
42 @property
43 @functools.wraps(func)
44 def closure(self):
45 val = getattr(self, "_" + func.__name__)
46 if val is NoData:
47 val = func(self)
48 setattr(self, "_" + func.__name__, val)
49 return val
50 return closure
51
52
53def load_fio_log_file(fname):
54 with open(fname) as fd:
55 it = [ln.split(',')[:2] for ln in fd]
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +030056
57 vals = [(float(off) / 1000, # convert us to ms
58 float(val.strip()) + 0.5) # add 0.5 to compemsate average value
59 # as fio trimm all values in log to integer
60 for off, val in it]
61
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030062 return TimeSeriesValue(vals)
63
64
65def load_test_results(cls, folder, run_num):
66 res = {}
67 params = None
68
69 fn = os.path.join(folder, str(run_num) + '_params.yaml')
70 params = yaml.load(open(fn).read())
71
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +030072 conn_ids_set = set()
73 rr = r"{0}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.\d+\.log$".format(run_num)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030074 for fname in os.listdir(folder):
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030075 rm = re.match(rr, fname)
76 if rm is None:
77 continue
78
79 conn_id_s = rm.group('conn_id')
80 conn_id = conn_id_s.replace('_', ':')
81 ftype = rm.group('type')
82
83 if ftype not in ('iops', 'bw', 'lat'):
84 continue
85
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +030086 ts = load_fio_log_file(os.path.join(folder, fname))
87 res.setdefault(ftype, {}).setdefault(conn_id, []).append(ts)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030088
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +030089 conn_ids_set.add(conn_id)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030090
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +030091 mm_res = {}
92
93 for key, data in res.items():
94 conn_ids = sorted(conn_ids_set)
95 matr = [data[conn_id] for conn_id in conn_ids]
96
97 mm_res[key] = MeasurementMatrix(matr, conn_ids)
98
99 iops_from_lat_matr = []
100 for node_ts in mm_res['lat'].data:
101 iops_from_lat_matr.append([])
102 for thread_ts in node_ts:
103 ndt = [(start + ln, 1000000. / val)
104 for (start, ln, val) in thread_ts.data]
105 new_ts = TimeSeriesValue(ndt)
106 iops_from_lat_matr[-1].append(new_ts)
107
108 mm_res['iops_from_lat'] = MeasurementMatrix(iops_from_lat_matr, conn_ids)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300109
110 raw_res = {}
111 for conn_id in conn_ids:
112 fn = os.path.join(folder, "{0}_{1}_rawres.json".format(run_num, conn_id_s))
113
114 # remove message hack
115 fc = "{" + open(fn).read().split('{', 1)[1]
116 raw_res[conn_id] = json.loads(fc)
117
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300118 fio_task = FioJobSection(params['name'])
119 fio_task.vals.update(params['vals'])
120
121 config = TestConfig('io', params, None, params['nodes'], folder, None)
122 return cls(config, fio_task, mm_res, raw_res, params['intervals'])
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300123
124
125class Attrmapper(object):
126 def __init__(self, dct):
127 self.__dct = dct
128
129 def __getattr__(self, name):
130 try:
131 return self.__dct[name]
132 except KeyError:
133 raise AttributeError(name)
134
135
136class DiskPerfInfo(object):
137 def __init__(self, name, summary, params, testnodes_count):
138 self.name = name
139 self.bw = None
140 self.iops = None
141 self.lat = None
142 self.lat_50 = None
143 self.lat_95 = None
144
145 self.raw_bw = []
146 self.raw_iops = []
147 self.raw_lat = []
148
149 self.params = params
150 self.testnodes_count = testnodes_count
151 self.summary = summary
152 self.p = Attrmapper(self.params['vals'])
153
154 self.sync_mode = get_test_sync_mode(self.params['vals'])
155 self.concurence = self.params['vals'].get('numjobs', 1)
156
157
158def get_lat_perc_50_95(lat_mks):
159 curr_perc = 0
160 perc_50 = None
161 perc_95 = None
162 pkey = None
163 for key, val in sorted(lat_mks.items()):
164 if curr_perc + val >= 50 and perc_50 is None:
165 if pkey is None or val < 1.:
166 perc_50 = key
167 else:
168 perc_50 = (50. - curr_perc) / val * (key - pkey) + pkey
169
170 if curr_perc + val >= 95:
171 if pkey is None or val < 1.:
172 perc_95 = key
173 else:
174 perc_95 = (95. - curr_perc) / val * (key - pkey) + pkey
175 break
176
177 pkey = key
178 curr_perc += val
179
180 return perc_50 / 1000., perc_95 / 1000.
181
182
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300183class IOTestResult(TestResults):
184 """
185 Fio run results
186 config: TestConfig
187 fio_task: FioJobSection
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300188 ts_results: {str: MeasurementMatrix[TimeSeriesValue]}
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300189 raw_result: ????
190 run_interval:(float, float) - test tun time, used for sensors
191 """
192 def __init__(self, config, fio_task, ts_results, raw_result, run_interval):
193
194 self.name = fio_task.name.split("_")[0]
195 self.fio_task = fio_task
196
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300197 self.bw = ts_results.get('bw')
198 self.lat = ts_results.get('lat')
199 self.iops = ts_results.get('iops')
200 self.iops_from_lat = ts_results.get('iops_from_lat')
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300201
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300202 # self.slat = drop_warmup(res.get('clat', None), self.params)
203 # self.clat = drop_warmup(res.get('slat', None), self.params)
204
205 res = {"bw": self.bw, "lat": self.lat, "iops": self.iops}
206
207 self.sensors_data = None
208 self._pinfo = None
209 TestResults.__init__(self, config, res, raw_result, run_interval)
210
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300211 def get_params_from_fio_report(self):
212 nodes = self.bw.connections_ids
213
214 iops = [self.raw_result[node]['jobs'][0]['mixed']['iops'] for node in nodes]
215 total_ios = [self.raw_result[node]['jobs'][0]['mixed']['total_ios'] for node in nodes]
216 runtime = [self.raw_result[node]['jobs'][0]['mixed']['runtime'] / 1000 for node in nodes]
217 flt_iops = [float(ios) / rtime for ios, rtime in zip(total_ios, runtime)]
218
219 bw = [self.raw_result[node]['jobs'][0]['mixed']['bw'] for node in nodes]
220 total_bytes = [self.raw_result[node]['jobs'][0]['mixed']['io_bytes'] for node in nodes]
221 flt_bw = [float(tbytes) / rtime for tbytes, rtime in zip(total_bytes, runtime)]
222
223 lat = [self.raw_result[node]['jobs'][0]['mixed']['lat'] for node in nodes]
224
225 return {'iops': iops,
226 'flt_iops': flt_iops,
227 'bw': bw,
228 'flt_bw': flt_bw,
229 'lat': lat}
230
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300231 def summary(self):
232 return get_test_summary(self.fio_task) + "vm" \
233 + str(len(self.config.nodes))
234
235 def get_yamable(self):
236 return self.summary()
237
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300238 def disk_perf_info(self, avg_interval=5.0):
239
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300240 if self._pinfo is not None:
241 return self._pinfo
242
243 lat_mks = collections.defaultdict(lambda: 0)
244 num_res = 0
245
246 for _, result in self.raw_result.items():
247 num_res += len(result['jobs'])
248 for job_info in result['jobs']:
249 for k, v in job_info['latency_ms'].items():
250 if isinstance(k, basestring) and k.startswith('>='):
251 lat_mks[int(k[2:]) * 1000] += v
252 else:
253 lat_mks[int(k) * 1000] += v
254
255 for k, v in job_info['latency_us'].items():
256 lat_mks[int(k)] += v
257
258 for k, v in lat_mks.items():
259 lat_mks[k] = float(v) / num_res
260
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300261 testnodes_count = len(self.config.nodes)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300262
263 pinfo = DiskPerfInfo(self.name,
264 self.summary(),
265 self.params,
266 testnodes_count)
267
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300268 # ramp_time = self.fio_task.vals.get('ramp_time', 0)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300269
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300270 def prepare(data):
271 if data is None:
272 return data
273
274 res = []
275 for ts_data in data:
276 # if ramp_time > 0:
277 # ts_data = ts_data.skip(ramp_time)
278
279 if ts_data.average_interval() < avg_interval:
280 ts_data = ts_data.derived(avg_interval)
281
282 res.append(ts_data.values)
283 return res
284
285 def agg_data(matr):
286 arr = sum(matr, [])
287 min_len = min(map(len, arr))
288 res = []
289 for idx in range(min_len):
290 res.append(sum(dt[idx] for dt in arr))
291 return res
292
293 pinfo.raw_lat = map(prepare, self.lat.per_vm())
294 num_th = sum(map(len, pinfo.raw_lat))
295 avg_lat = [val / num_th for val in agg_data(pinfo.raw_lat)]
296 pinfo.lat = data_property(avg_lat)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300297 pinfo.lat_50, pinfo.lat_95 = get_lat_perc_50_95(lat_mks)
298
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300299 pinfo.raw_bw = map(prepare, self.bw.per_vm())
300 pinfo.raw_iops = map(prepare, self.iops.per_vm())
301
302 iops_per_th = sum(sum(pinfo.raw_iops, []), [])
303
304 fparams = self.get_params_from_fio_report()
305 fio_report_bw = sum(fparams['flt_bw'])
306 fio_report_iops = sum(fparams['flt_iops'])
307
308 agg_bw = agg_data(pinfo.raw_bw)
309 agg_iops = agg_data(pinfo.raw_iops)
310
311 log_bw_avg = average(agg_bw)
312 log_iops_avg = average(agg_iops)
313
314 # update values to match average from fio report
315 coef_iops = fio_report_iops / float(log_iops_avg)
316 coef_bw = fio_report_bw / float(log_bw_avg)
317
318 bw_log = data_property([val * coef_bw for val in agg_bw])
319 iops_log = data_property([val * coef_iops for val in agg_iops])
320
321 bw_report = data_property([fio_report_bw])
322 iops_report = data_property([fio_report_iops])
323
324 # When IOPS/BW per thread is too low
325 # data from logs is rounded to match
326 if average(iops_per_th) > 10:
327 pinfo.bw = bw_log
328 pinfo.iops = iops_log
329 pinfo.bw2 = bw_report
330 pinfo.iops2 = iops_report
331 else:
332 pinfo.bw = bw_report
333 pinfo.iops = iops_report
334 pinfo.bw2 = bw_log
335 pinfo.iops2 = iops_log
336
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300337 self._pinfo = pinfo
338
339 return pinfo
340
341
342class IOPerfTest(PerfTest):
343 tcp_conn_timeout = 30
344 max_pig_timeout = 5
345 soft_runcycle = 5 * 60
346
347 def __init__(self, config):
348 PerfTest.__init__(self, config)
349
350 get = self.config.params.get
351 do_get = self.config.params.__getitem__
352
353 self.config_fname = do_get('cfg')
354
355 if '/' not in self.config_fname and '.' not in self.config_fname:
356 cfgs_dir = os.path.dirname(__file__)
357 self.config_fname = os.path.join(cfgs_dir,
358 self.config_fname + '.cfg')
359
360 self.alive_check_interval = get('alive_check_interval')
361 self.use_system_fio = get('use_system_fio', False)
362
363 self.config_params = get('params', {}).copy()
364
365 self.io_py_remote = self.join_remote("agent.py")
366 self.results_file = self.join_remote("results.json")
367 self.pid_file = self.join_remote("pid")
368 self.task_file = self.join_remote("task.cfg")
369 self.sh_file = self.join_remote("cmd.sh")
370 self.err_out_file = self.join_remote("fio_err_out")
371 self.exit_code_file = self.join_remote("exit_code")
372
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300373 self.max_latency = get("max_lat", None)
374 self.min_bw_per_thread = get("max_bw", None)
375
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300376 self.use_sudo = get("use_sudo", True)
377 self.test_logging = get("test_logging", False)
378
379 self.raw_cfg = open(self.config_fname).read()
380 self.fio_configs = fio_cfg_compile(self.raw_cfg,
381 self.config_fname,
382 self.config_params,
383 split_on_names=self.test_logging)
384 self.fio_configs = list(self.fio_configs)
385
386 @classmethod
387 def load(cls, folder):
388 for fname in os.listdir(folder):
389 if re.match("\d+_params.yaml$", fname):
390 num = int(fname.split('_')[0])
391 yield load_test_results(IOTestResult, folder, num)
392
393 def cleanup(self):
394 # delete_file(conn, self.io_py_remote)
395 # Need to remove tempo files, used for testing
396 pass
397
398 def prefill_test_files(self, files, rossh):
399 cmd_templ = "fio --name=xxx --filename={0} --direct=1" + \
400 " --bs=4m --size={1}m --rw=write"
401
402 if self.use_sudo:
403 cmd_templ = "sudo " + cmd_templ
404
405 ssize = 0
406 stime = time.time()
407
408 for fname, curr_sz in files.items():
409 cmd = cmd_templ.format(fname, curr_sz)
410 ssize += curr_sz
411
412 rossh(cmd, timeout=curr_sz)
413
414 ddtime = time.time() - stime
415 if ddtime > 1E-3:
416 fill_bw = int(ssize / ddtime)
417 mess = "Initiall dd fill bw is {0} MiBps for this vm"
418 logger.info(mess.format(fill_bw))
419 return fill_bw
420
421 def install_utils(self, rossh, max_retry=3, timeout=5):
422 need_install = []
423 packs = [('screen', 'screen')]
424
425 if self.use_system_fio:
426 packs.append(('fio', 'fio'))
427 else:
428 # define OS and x32/x64
429 # copy appropriate fio
430 # add fio deps
431 pass
432
433 for bin_name, package in packs:
434 if bin_name is None:
435 need_install.append(package)
436 continue
437
438 try:
439 rossh('which ' + bin_name, nolog=True)
440 except OSError:
441 need_install.append(package)
442
443 if len(need_install) == 0:
444 return
445
446 if 'redhat' == get_os(rossh):
447 cmd = "sudo yum -y install " + " ".join(need_install)
448 else:
449 cmd = "sudo apt-get -y install " + " ".join(need_install)
450
451 for _ in range(max_retry):
452 try:
453 rossh(cmd)
454 break
455 except OSError as err:
456 time.sleep(timeout)
457 else:
458 raise OSError("Can't install - " + str(err))
459
460 def pre_run(self):
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300461 prefill = self.config.options.get('prefill_files', True)
462
463 if prefill:
464 files = {}
465 for cfg_slice in self.fio_configs:
466 for section in cfg_slice:
467 sz = ssize2b(section.vals['size'])
468 msz = sz / (1024 ** 2)
469
470 if sz % (1024 ** 2) != 0:
471 msz += 1
472
473 fname = section.vals['filename']
474
475 # if already has other test with the same file name
476 # take largest size
477 files[fname] = max(files.get(fname, 0), msz)
478 else:
479 files = None
480 logger.warning("Prefilling of test files is disabled")
481
482 with ThreadPoolExecutor(len(self.config.nodes)) as pool:
483 fc = functools.partial(self.pre_run_th, files=files)
484 list(pool.map(fc, self.config.nodes))
485
486 def pre_run_th(self, node, files):
487 # fill files with pseudo-random data
488 rossh = run_on_node(node)
489
490 try:
491 cmd = 'mkdir -p "{0}"'.format(self.config.remote_dir)
492 if self.use_sudo:
493 cmd = "sudo " + cmd
494 cmd += " ; sudo chown {0} {1}".format(self.node.get_user(),
495 self.config.remote_dir)
496
497 rossh(cmd)
498 except Exception as exc:
499 msg = "Failed to create folder {0} on remote {1}. Error: {2!s}"
500 msg = msg.format(self.config.remote_dir, self.node.get_conn_id(), exc)
501 logger.exception(msg)
502 raise StopTestError(msg, exc)
503
504 if files is not None:
505 self.prefill_test_files(rossh, files)
506
507 self.install_utils(rossh)
508
509 def run(self):
510 if len(self.fio_configs) > 1:
511 # +10% - is a rough estimation for additional operations
512 # like sftp, etc
513 exec_time = int(sum(map(execution_time, self.fio_configs)) * 1.1)
514 exec_time_s = sec_to_str(exec_time)
515 now_dt = datetime.datetime.now()
516 end_dt = now_dt + datetime.timedelta(0, exec_time)
517 msg = "Entire test should takes aroud: {0} and finished at {1}"
518 logger.info(msg.format(exec_time_s,
519 end_dt.strftime("%H:%M:%S")))
520
521 tname = os.path.basename(self.config_fname)
522 if tname.endswith('.cfg'):
523 tname = tname[:-4]
524
525 barrier = Barrier(len(self.config.nodes))
526 results = []
527
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300528 # set of OperationModeBlockSize str
529 # which should not ne tested anymore, as
530 # they already too slow with previous thread count
531 lat_bw_limit_reached = set()
532
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300533 with ThreadPoolExecutor(len(self.config.nodes)) as pool:
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300534 self.fio_configs.sort(key=lambda x: int(x.vals['numjobs']))
535
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300536 for pos, fio_cfg in enumerate(self.fio_configs):
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300537 test_descr = get_test_summary(fio_cfg.vals).split("th")[0]
538 if test_descr in lat_bw_limit_reached:
539 logger.info("Will skip {0} test due to lat/bw limits".format(fio_cfg.name))
540 continue
541 else:
542 logger.info("Will run {0} test".format(fio_cfg.name))
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300543
544 templ = "Test should takes about {0}." + \
545 " Should finish at {1}," + \
546 " will wait at most till {2}"
547 exec_time = execution_time(fio_cfg)
548 exec_time_str = sec_to_str(exec_time)
549 timeout = int(exec_time + max(300, exec_time))
550
551 now_dt = datetime.datetime.now()
552 end_dt = now_dt + datetime.timedelta(0, exec_time)
553 wait_till = now_dt + datetime.timedelta(0, timeout)
554
555 logger.info(templ.format(exec_time_str,
556 end_dt.strftime("%H:%M:%S"),
557 wait_till.strftime("%H:%M:%S")))
558
559 func = functools.partial(self.do_run,
560 barrier=barrier,
561 fio_cfg=fio_cfg,
562 pos=pos)
563
564 max_retr = 3
565 for idx in range(max_retr):
566 try:
567 intervals = list(pool.map(func, self.config.nodes))
568 break
569 except (EnvironmentError, SSHException) as exc:
570 logger.exception("During fio run")
571 if idx == max_retr - 1:
572 raise StopTestError("Fio failed", exc)
573
574 logger.info("Sleeping 30s and retrying")
575 time.sleep(30)
576
577 fname = "{0}_task.fio".format(pos)
578 with open(os.path.join(self.config.log_directory, fname), "w") as fd:
579 fd.write(str(fio_cfg))
580
581 params = {'vm_count': len(self.config.nodes)}
582 params['name'] = fio_cfg.name
583 params['vals'] = dict(fio_cfg.vals.items())
584 params['intervals'] = intervals
585 params['nodes'] = [node.get_conn_id() for node in self.config.nodes]
586
587 fname = "{0}_params.yaml".format(pos)
588 with open(os.path.join(self.config.log_directory, fname), "w") as fd:
589 fd.write(dumps(params))
590
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300591 res = load_test_results(IOTestResult, self.config.log_directory, pos)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300592 results.append(res)
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300593
594 test_res = res.get_params_from_fio_report()
595 if self.max_latency is not None:
596 if self.max_latency < average(test_res['lat']):
597 lat_bw_limit_reached.add(test_descr)
598
599 if self.min_bw_per_thread is not None:
600 if self.min_bw_per_thread > average(test_res['bw']):
601 lat_bw_limit_reached.add(test_descr)
602
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300603 return results
604
605 def do_run(self, node, barrier, fio_cfg, pos, nolog=False):
606 exec_folder = os.path.dirname(self.task_file)
607 bash_file = "#!/bin/bash\n" + \
608 "cd {exec_folder}\n" + \
609 "fio --output-format=json --output={out_file} " + \
610 "--alloc-size=262144 {job_file} " + \
611 " >{err_out_file} 2>&1 \n" + \
612 "echo $? >{res_code_file}\n"
613
614 bash_file = bash_file.format(out_file=self.results_file,
615 job_file=self.task_file,
616 err_out_file=self.err_out_file,
617 res_code_file=self.exit_code_file,
618 exec_folder=exec_folder)
619
koder aka kdanilov5414a992015-06-13 03:07:25 +0300620 run_on_node(node)("mkdir -p {0}".format(exec_folder), nolog=True)
621
koder aka kdanilov89fb6102015-06-13 02:58:08 +0300622 assert exec_folder != "" and exec_folder != "/"
623 run_on_node(node)("rm -rf {0}/*".format(exec_folder), nolog=True)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300624
625 with node.connection.open_sftp() as sftp:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300626 save_to_remote(sftp, self.task_file, str(fio_cfg))
627 save_to_remote(sftp, self.sh_file, bash_file)
628
629 exec_time = execution_time(fio_cfg)
630
631 timeout = int(exec_time + max(300, exec_time))
632 soft_tout = exec_time
633
634 begin = time.time()
635
koder aka kdanilov5414a992015-06-13 03:07:25 +0300636 if self.use_sudo:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300637 sudo = "sudo "
638 else:
639 sudo = ""
640
641 fnames_before = run_on_node(node)("ls -1 " + exec_folder, nolog=True)
642
643 barrier.wait()
644
koder aka kdanilov5414a992015-06-13 03:07:25 +0300645 task = BGSSHTask(node, self.use_sudo)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300646 task.start(sudo + "bash " + self.sh_file)
647
648 while True:
649 try:
650 task.wait(soft_tout, timeout)
651 break
652 except paramiko.SSHException:
653 pass
654
655 try:
656 node.connection.close()
657 except:
658 pass
659
660 reconnect(node.connection, node.conn_url)
661
662 end = time.time()
663 rossh = run_on_node(node)
664 fnames_after = rossh("ls -1 " + exec_folder, nolog=True)
665
666 conn_id = node.get_conn_id().replace(":", "_")
667 if not nolog:
668 logger.debug("Test on node {0} is finished".format(conn_id))
669
670 log_files_pref = []
671 if 'write_lat_log' in fio_cfg.vals:
672 fname = fio_cfg.vals['write_lat_log']
673 log_files_pref.append(fname + '_clat')
674 log_files_pref.append(fname + '_lat')
675 log_files_pref.append(fname + '_slat')
676
677 if 'write_iops_log' in fio_cfg.vals:
678 fname = fio_cfg.vals['write_iops_log']
679 log_files_pref.append(fname + '_iops')
680
681 if 'write_bw_log' in fio_cfg.vals:
682 fname = fio_cfg.vals['write_bw_log']
683 log_files_pref.append(fname + '_bw')
684
685 files = collections.defaultdict(lambda: [])
686 all_files = [os.path.basename(self.results_file)]
687 new_files = set(fnames_after.split()) - set(fnames_before.split())
688 for fname in new_files:
689 if fname.endswith('.log') and fname.split('.')[0] in log_files_pref:
690 name, _ = os.path.splitext(fname)
691 if fname.count('.') == 1:
692 tp = name.split("_")[-1]
693 cnt = 0
694 else:
695 tp_cnt = name.split("_")[-1]
696 tp, cnt = tp_cnt.split('.')
697 files[tp].append((int(cnt), fname))
698 all_files.append(fname)
699
700 arch_name = self.join_remote('wally_result.tar.gz')
701 tmp_dir = os.path.join(self.config.log_directory, 'tmp_' + conn_id)
702 os.mkdir(tmp_dir)
703 loc_arch_name = os.path.join(tmp_dir, 'wally_result.{0}.tar.gz'.format(conn_id))
704 file_full_names = " ".join(all_files)
705
706 try:
707 os.unlink(loc_arch_name)
708 except:
709 pass
710
711 with node.connection.open_sftp() as sftp:
712 exit_code = read_from_remote(sftp, self.exit_code_file)
713 err_out = read_from_remote(sftp, self.err_out_file)
714 exit_code = exit_code.strip()
715
716 if exit_code != '0':
717 msg = "fio exit with code {0}: {1}".format(exit_code, err_out)
718 logger.critical(msg.strip())
719 raise StopTestError("fio failed")
720
721 rossh("rm -f {0}".format(arch_name), nolog=True)
722 cmd = "cd {0} ; tar zcvf {1} {2}".format(exec_folder, arch_name, file_full_names)
723 rossh(cmd, nolog=True)
724 sftp.get(arch_name, loc_arch_name)
725
726 cmd = "cd {0} ; tar xvzf {1} >/dev/null".format(tmp_dir, loc_arch_name)
727 subprocess.check_call(cmd, shell=True)
728 os.unlink(loc_arch_name)
729
730 for ftype, fls in files.items():
731 for idx, fname in fls:
732 cname = os.path.join(tmp_dir, fname)
733 loc_fname = "{0}_{1}_{2}.{3}.log".format(pos, conn_id, ftype, idx)
734 loc_path = os.path.join(self.config.log_directory, loc_fname)
735 os.rename(cname, loc_path)
736
737 cname = os.path.join(tmp_dir,
738 os.path.basename(self.results_file))
739 loc_fname = "{0}_{1}_rawres.json".format(pos, conn_id)
740 loc_path = os.path.join(self.config.log_directory, loc_fname)
741 os.rename(cname, loc_path)
742
743 os.rmdir(tmp_dir)
744 return begin, end
745
746 @classmethod
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300747 def format_for_console(cls, results):
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300748 """
749 create a table with io performance report
750 for console
751 """
752
753 def getconc(data):
754 th_count = data.params['vals'].get('numjobs')
755
756 if th_count is None:
757 th_count = data.params['vals'].get('concurence', 1)
758 return th_count
759
760 def key_func(data):
761 p = data.params['vals']
762
763 th_count = getconc(data)
764
765 return (data.name.rsplit("_", 1)[0],
766 p['rw'],
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300767 get_test_sync_mode(data.params['vals']),
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300768 ssize2b(p['blocksize']),
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300769 int(th_count) * len(data.config.nodes))
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300770
771 tab = texttable.Texttable(max_width=120)
772 tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300773
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300774 header = ["Name", "Description", "iops\ncum", "KiBps\ncum",
775 "Cnf\n95%", "Dev%", "iops\nper vm", "KiBps\nper vm", "lat\nms"]
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300776 tab.set_cols_align(["l", "l"] + ['r'] * (len(header) - 2))
777 sep = ["-------", "-----------"] + ["---"] * (len(header) - 2)
778 tab.header(header)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300779
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300780 prev_k = None
781 for item in sorted(results, key=key_func):
782 curr_k = key_func(item)[:4]
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300783 if prev_k is not None:
784 if prev_k != curr_k:
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300785 tab.add_row(sep)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300786
787 prev_k = curr_k
788
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300789 test_dinfo = item.disk_perf_info()
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300790
791 iops, _ = test_dinfo.iops.rounded_average_conf()
792
793 bw, bw_conf = test_dinfo.bw.rounded_average_conf()
794 _, bw_dev = test_dinfo.bw.rounded_average_dev()
795 conf_perc = int(round(bw_conf * 100 / bw))
796 dev_perc = int(round(bw_dev * 100 / bw))
797
798 lat, _ = test_dinfo.lat.rounded_average_conf()
799 lat = round_3_digit(int(lat) // 1000)
800
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300801 testnodes_count = len(item.config.nodes)
802 iops_per_vm = round_3_digit(iops / testnodes_count)
803 bw_per_vm = round_3_digit(bw / testnodes_count)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300804
805 iops = round_3_digit(iops)
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300806 # iops_from_lat = round_3_digit(iops_from_lat)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300807 bw = round_3_digit(bw)
808
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300809 params = (item.name.rsplit('_', 1)[0],
810 item.summary(),
811 int(iops),
812 int(bw),
813 str(conf_perc),
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300814 str(dev_perc),
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300815 int(iops_per_vm),
816 int(bw_per_vm),
817 lat)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300818 tab.add_row(params)
819
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300820 return tab.draw()