blob: c64523dac857efec8474f7d96f41a9cfffdc7c30 [file] [log] [blame]
koder aka kdanilovbc2c8982015-06-13 02:50:43 +03001import re
2import time
3import json
koder aka kdanilov6ab4d432015-06-22 00:26:28 +03004import random
koder aka kdanilovbc2c8982015-06-13 02:50:43 +03005import os.path
6import logging
7import datetime
8import functools
9import subprocess
10import collections
11
12import yaml
13import paramiko
14import texttable
15from paramiko.ssh_exception import SSHException
16from concurrent.futures import ThreadPoolExecutor
17
koder aka kdanilov6ab4d432015-06-22 00:26:28 +030018import wally
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030019from wally.pretty_yaml import dumps
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +030020from wally.statistic import round_3_digit, data_property, average
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030021from wally.utils import ssize2b, sec_to_str, StopTestError, Barrier, get_os
22from wally.ssh_utils import (save_to_remote, read_from_remote, BGSSHTask, reconnect)
23
24from .fio_task_parser import (execution_time, fio_cfg_compile,
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +030025 get_test_summary, get_test_sync_mode, FioJobSection)
26from ..itest import (TimeSeriesValue, PerfTest, TestResults,
27 run_on_node, TestConfig, MeasurementMatrix)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030028
29logger = logging.getLogger("wally")
30
31
32# Results folder structure
33# results/
34# {loadtype}_{num}/
35# config.yaml
36# ......
37
38
39class NoData(object):
40 pass
41
42
43def cached_prop(func):
44 @property
45 @functools.wraps(func)
46 def closure(self):
47 val = getattr(self, "_" + func.__name__)
48 if val is NoData:
49 val = func(self)
50 setattr(self, "_" + func.__name__, val)
51 return val
52 return closure
53
54
55def load_fio_log_file(fname):
56 with open(fname) as fd:
57 it = [ln.split(',')[:2] for ln in fd]
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +030058
59 vals = [(float(off) / 1000, # convert us to ms
60 float(val.strip()) + 0.5) # add 0.5 to compemsate average value
61 # as fio trimm all values in log to integer
62 for off, val in it]
63
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030064 return TimeSeriesValue(vals)
65
66
67def load_test_results(cls, folder, run_num):
68 res = {}
69 params = None
70
71 fn = os.path.join(folder, str(run_num) + '_params.yaml')
72 params = yaml.load(open(fn).read())
73
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +030074 conn_ids_set = set()
75 rr = r"{0}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.\d+\.log$".format(run_num)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030076 for fname in os.listdir(folder):
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030077 rm = re.match(rr, fname)
78 if rm is None:
79 continue
80
81 conn_id_s = rm.group('conn_id')
82 conn_id = conn_id_s.replace('_', ':')
83 ftype = rm.group('type')
84
85 if ftype not in ('iops', 'bw', 'lat'):
86 continue
87
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +030088 ts = load_fio_log_file(os.path.join(folder, fname))
89 res.setdefault(ftype, {}).setdefault(conn_id, []).append(ts)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030090
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +030091 conn_ids_set.add(conn_id)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030092
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +030093 mm_res = {}
94
95 for key, data in res.items():
96 conn_ids = sorted(conn_ids_set)
97 matr = [data[conn_id] for conn_id in conn_ids]
98
99 mm_res[key] = MeasurementMatrix(matr, conn_ids)
100
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300101 # iops_from_lat_matr = []
102 # for node_ts in mm_res['lat'].data:
103 # iops_from_lat_matr.append([])
104 # for thread_ts in node_ts:
105 # ndt = [(start + ln, 1000000. / val)
106 # for (start, ln, val) in thread_ts.data]
107 # new_ts = TimeSeriesValue(ndt)
108 # iops_from_lat_matr[-1].append(new_ts)
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300109
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300110 # mm_res['iops_from_lat'] = MeasurementMatrix(iops_from_lat_matr, conn_ids)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300111
112 raw_res = {}
113 for conn_id in conn_ids:
114 fn = os.path.join(folder, "{0}_{1}_rawres.json".format(run_num, conn_id_s))
115
116 # remove message hack
117 fc = "{" + open(fn).read().split('{', 1)[1]
118 raw_res[conn_id] = json.loads(fc)
119
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300120 fio_task = FioJobSection(params['name'])
121 fio_task.vals.update(params['vals'])
122
123 config = TestConfig('io', params, None, params['nodes'], folder, None)
124 return cls(config, fio_task, mm_res, raw_res, params['intervals'])
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300125
126
127class Attrmapper(object):
128 def __init__(self, dct):
129 self.__dct = dct
130
131 def __getattr__(self, name):
132 try:
133 return self.__dct[name]
134 except KeyError:
135 raise AttributeError(name)
136
137
138class DiskPerfInfo(object):
139 def __init__(self, name, summary, params, testnodes_count):
140 self.name = name
141 self.bw = None
142 self.iops = None
143 self.lat = None
144 self.lat_50 = None
145 self.lat_95 = None
146
147 self.raw_bw = []
148 self.raw_iops = []
149 self.raw_lat = []
150
151 self.params = params
152 self.testnodes_count = testnodes_count
153 self.summary = summary
154 self.p = Attrmapper(self.params['vals'])
155
156 self.sync_mode = get_test_sync_mode(self.params['vals'])
157 self.concurence = self.params['vals'].get('numjobs', 1)
158
159
160def get_lat_perc_50_95(lat_mks):
161 curr_perc = 0
162 perc_50 = None
163 perc_95 = None
164 pkey = None
165 for key, val in sorted(lat_mks.items()):
166 if curr_perc + val >= 50 and perc_50 is None:
167 if pkey is None or val < 1.:
168 perc_50 = key
169 else:
170 perc_50 = (50. - curr_perc) / val * (key - pkey) + pkey
171
172 if curr_perc + val >= 95:
173 if pkey is None or val < 1.:
174 perc_95 = key
175 else:
176 perc_95 = (95. - curr_perc) / val * (key - pkey) + pkey
177 break
178
179 pkey = key
180 curr_perc += val
181
182 return perc_50 / 1000., perc_95 / 1000.
183
184
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300185class IOTestResult(TestResults):
186 """
187 Fio run results
188 config: TestConfig
189 fio_task: FioJobSection
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300190 ts_results: {str: MeasurementMatrix[TimeSeriesValue]}
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300191 raw_result: ????
192 run_interval:(float, float) - test tun time, used for sensors
193 """
194 def __init__(self, config, fio_task, ts_results, raw_result, run_interval):
195
196 self.name = fio_task.name.split("_")[0]
197 self.fio_task = fio_task
198
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300199 self.bw = ts_results.get('bw')
200 self.lat = ts_results.get('lat')
201 self.iops = ts_results.get('iops')
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300202 # self.iops_from_lat = ts_results.get('iops_from_lat')
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300203
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300204 # self.slat = drop_warmup(res.get('clat', None), self.params)
205 # self.clat = drop_warmup(res.get('slat', None), self.params)
206
207 res = {"bw": self.bw, "lat": self.lat, "iops": self.iops}
208
209 self.sensors_data = None
210 self._pinfo = None
211 TestResults.__init__(self, config, res, raw_result, run_interval)
212
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300213 def get_params_from_fio_report(self):
214 nodes = self.bw.connections_ids
215
216 iops = [self.raw_result[node]['jobs'][0]['mixed']['iops'] for node in nodes]
217 total_ios = [self.raw_result[node]['jobs'][0]['mixed']['total_ios'] for node in nodes]
218 runtime = [self.raw_result[node]['jobs'][0]['mixed']['runtime'] / 1000 for node in nodes]
219 flt_iops = [float(ios) / rtime for ios, rtime in zip(total_ios, runtime)]
220
221 bw = [self.raw_result[node]['jobs'][0]['mixed']['bw'] for node in nodes]
222 total_bytes = [self.raw_result[node]['jobs'][0]['mixed']['io_bytes'] for node in nodes]
223 flt_bw = [float(tbytes) / rtime for tbytes, rtime in zip(total_bytes, runtime)]
224
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300225 return {'iops': iops,
226 'flt_iops': flt_iops,
227 'bw': bw,
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300228 'flt_bw': flt_bw}
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300229
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300230 def summary(self):
231 return get_test_summary(self.fio_task) + "vm" \
232 + str(len(self.config.nodes))
233
234 def get_yamable(self):
235 return self.summary()
236
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300237 def get_lat_perc_50_95_multy(self):
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300238 lat_mks = collections.defaultdict(lambda: 0)
239 num_res = 0
240
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300241 for result in self.raw_result.values():
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300242 num_res += len(result['jobs'])
243 for job_info in result['jobs']:
244 for k, v in job_info['latency_ms'].items():
245 if isinstance(k, basestring) and k.startswith('>='):
246 lat_mks[int(k[2:]) * 1000] += v
247 else:
248 lat_mks[int(k) * 1000] += v
249
250 for k, v in job_info['latency_us'].items():
251 lat_mks[int(k)] += v
252
253 for k, v in lat_mks.items():
254 lat_mks[k] = float(v) / num_res
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300255 return get_lat_perc_50_95(lat_mks)
256
257 def disk_perf_info(self, avg_interval=2.0):
258
259 if self._pinfo is not None:
260 return self._pinfo
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300261
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300262 testnodes_count = len(self.config.nodes)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300263
264 pinfo = DiskPerfInfo(self.name,
265 self.summary(),
266 self.params,
267 testnodes_count)
268
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300269 # ramp_time = self.fio_task.vals.get('ramp_time', 0)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300270
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300271 def prepare(data, drop=1):
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300272 if data is None:
273 return data
274
275 res = []
276 for ts_data in data:
277 # if ramp_time > 0:
278 # ts_data = ts_data.skip(ramp_time)
279
280 if ts_data.average_interval() < avg_interval:
281 ts_data = ts_data.derived(avg_interval)
282
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300283 # drop last value on bounds
284 # as they may contains ranges without activities
285 assert len(ts_data.values) >= drop + 1
286
287 if drop > 0:
288 res.append(ts_data.values[:-drop])
289 else:
290 res.append(ts_data.values)
291
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300292 return res
293
294 def agg_data(matr):
295 arr = sum(matr, [])
296 min_len = min(map(len, arr))
297 res = []
298 for idx in range(min_len):
299 res.append(sum(dt[idx] for dt in arr))
300 return res
301
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300302 # pinfo.raw_lat = map(prepare, self.lat.per_vm())
303 # num_th = sum(map(len, pinfo.raw_lat))
304 # avg_lat = [val / num_th for val in agg_data(pinfo.raw_lat)]
305 # pinfo.lat = data_property(avg_lat)
306 pinfo.lat_50, pinfo.lat_95 = self.get_lat_perc_50_95_multy()
307 pinfo.lat = pinfo.lat_50
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300308
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300309 pinfo.raw_bw = map(prepare, self.bw.per_vm())
310 pinfo.raw_iops = map(prepare, self.iops.per_vm())
311
312 iops_per_th = sum(sum(pinfo.raw_iops, []), [])
313
314 fparams = self.get_params_from_fio_report()
315 fio_report_bw = sum(fparams['flt_bw'])
316 fio_report_iops = sum(fparams['flt_iops'])
317
318 agg_bw = agg_data(pinfo.raw_bw)
319 agg_iops = agg_data(pinfo.raw_iops)
320
321 log_bw_avg = average(agg_bw)
322 log_iops_avg = average(agg_iops)
323
324 # update values to match average from fio report
325 coef_iops = fio_report_iops / float(log_iops_avg)
326 coef_bw = fio_report_bw / float(log_bw_avg)
327
328 bw_log = data_property([val * coef_bw for val in agg_bw])
329 iops_log = data_property([val * coef_iops for val in agg_iops])
330
331 bw_report = data_property([fio_report_bw])
332 iops_report = data_property([fio_report_iops])
333
334 # When IOPS/BW per thread is too low
335 # data from logs is rounded to match
336 if average(iops_per_th) > 10:
337 pinfo.bw = bw_log
338 pinfo.iops = iops_log
339 pinfo.bw2 = bw_report
340 pinfo.iops2 = iops_report
341 else:
342 pinfo.bw = bw_report
343 pinfo.iops = iops_report
344 pinfo.bw2 = bw_log
345 pinfo.iops2 = iops_log
346
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300347 self._pinfo = pinfo
348
349 return pinfo
350
351
352class IOPerfTest(PerfTest):
353 tcp_conn_timeout = 30
354 max_pig_timeout = 5
355 soft_runcycle = 5 * 60
356
357 def __init__(self, config):
358 PerfTest.__init__(self, config)
359
360 get = self.config.params.get
361 do_get = self.config.params.__getitem__
362
363 self.config_fname = do_get('cfg')
364
365 if '/' not in self.config_fname and '.' not in self.config_fname:
366 cfgs_dir = os.path.dirname(__file__)
367 self.config_fname = os.path.join(cfgs_dir,
368 self.config_fname + '.cfg')
369
370 self.alive_check_interval = get('alive_check_interval')
371 self.use_system_fio = get('use_system_fio', False)
372
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300373 if get('prefill_files') is not None:
374 logger.warning("prefill_files option is depricated. Use force_prefill instead")
375
376 self.force_prefill = get('force_prefill', False)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300377 self.config_params = get('params', {}).copy()
378
379 self.io_py_remote = self.join_remote("agent.py")
380 self.results_file = self.join_remote("results.json")
381 self.pid_file = self.join_remote("pid")
382 self.task_file = self.join_remote("task.cfg")
383 self.sh_file = self.join_remote("cmd.sh")
384 self.err_out_file = self.join_remote("fio_err_out")
385 self.exit_code_file = self.join_remote("exit_code")
386
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300387 self.max_latency = get("max_lat", None)
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300388 self.min_bw_per_thread = get("min_bw", None)
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300389
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300390 self.use_sudo = get("use_sudo", True)
391 self.test_logging = get("test_logging", False)
392
393 self.raw_cfg = open(self.config_fname).read()
394 self.fio_configs = fio_cfg_compile(self.raw_cfg,
395 self.config_fname,
396 self.config_params,
397 split_on_names=self.test_logging)
398 self.fio_configs = list(self.fio_configs)
399
400 @classmethod
401 def load(cls, folder):
402 for fname in os.listdir(folder):
403 if re.match("\d+_params.yaml$", fname):
404 num = int(fname.split('_')[0])
405 yield load_test_results(IOTestResult, folder, num)
406
407 def cleanup(self):
408 # delete_file(conn, self.io_py_remote)
409 # Need to remove tempo files, used for testing
410 pass
411
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300412 def check_prefill_required(self, rossh, fname, size, num_blocks=16):
413 cmd = """python -c "import sys; fd = open('{0}', 'rb');""" + \
414 """fd.seek({1}); sys.stdout.write(fc.read(1024))" | md5sum"""
415
416 if self.use_sudo:
417 cmd = "sudo " + cmd
418
419 zero_md5 = '54ac58cc1e2711a1a3d88bce15bb152d'
420
421 for _ in range(num_blocks):
422 offset = random.randrange(size * 1024)
423 data = rossh(cmd.format(fname, offset), nolog=True)
424 if zero_md5 == data.split()[0].strip():
425 return True
426 return False
427
428 def prefill_test_files(self, rossh, files, force=False):
429 if self.use_system_fio:
430 cmd_templ = "fio "
431 else:
432 cmd_templ = "{0}/fio ".format(self.config.remote_dir)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300433
434 if self.use_sudo:
435 cmd_templ = "sudo " + cmd_templ
436
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300437 cmd_templ += "--name=xxx --filename={0} --direct=1" + \
438 " --bs=4m --size={1}m --rw=write"
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300439
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300440 ssize = 0
441
442 if force:
443 logger.info("File prefilling is forced")
444
445 ddtime = 0
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300446 for fname, curr_sz in files.items():
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300447 if not force:
448 if not self.check_prefill_required(rossh, fname, curr_sz):
449 continue
450
451 logger.info("Prefilling file {0}".format(fname))
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300452 cmd = cmd_templ.format(fname, curr_sz)
453 ssize += curr_sz
454
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300455 stime = time.time()
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300456 rossh(cmd, timeout=curr_sz)
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300457 ddtime += time.time() - stime
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300458
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300459 if ddtime > 1.0:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300460 fill_bw = int(ssize / ddtime)
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300461 mess = "Initiall fio fill bw is {0} MiBps for this vm"
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300462 logger.info(mess.format(fill_bw))
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300463
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300464 def install_utils(self, node, rossh, max_retry=3, timeout=5):
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300465 need_install = []
466 packs = [('screen', 'screen')]
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300467 os_info = get_os(rossh)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300468
469 if self.use_system_fio:
470 packs.append(('fio', 'fio'))
471 else:
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300472 packs.append(('bzip2', 'bzip2'))
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300473
474 for bin_name, package in packs:
475 if bin_name is None:
476 need_install.append(package)
477 continue
478
479 try:
480 rossh('which ' + bin_name, nolog=True)
481 except OSError:
482 need_install.append(package)
483
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300484 if len(need_install) != 0:
485 if 'redhat' == os_info.distro:
486 cmd = "sudo yum -y install " + " ".join(need_install)
487 else:
488 cmd = "sudo apt-get -y install " + " ".join(need_install)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300489
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300490 for _ in range(max_retry):
491 try:
492 rossh(cmd)
493 break
494 except OSError as err:
495 time.sleep(timeout)
496 else:
497 raise OSError("Can't install - " + str(err))
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300498
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300499 if not self.use_system_fio:
500 fio_dir = os.path.dirname(os.path.dirname(wally.__file__))
501 fio_dir = os.path.join(os.getcwd(), fio_dir)
502 fio_dir = os.path.join(fio_dir, 'fio_binaries')
503 fname = 'fio_{0.release}_{0.arch}.bz2'.format(os_info)
504 fio_path = os.path.join(fio_dir, fname)
505
506 if not os.path.exists(fio_path):
507 raise RuntimeError("No prebuild fio available for {0}".format(os_info))
508
509 bz_dest = self.join_remote('fio.bz2')
510 with node.connection.open_sftp() as sftp:
511 sftp.put(fio_path, bz_dest)
512
513 rossh("bzip2 --decompress " + bz_dest, nolog=True)
514 rossh("chmod a+x " + self.join_remote("fio"), nolog=True)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300515
516 def pre_run(self):
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300517 files = {}
518 for section in self.fio_configs:
519 sz = ssize2b(section.vals['size'])
520 msz = sz / (1024 ** 2)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300521
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300522 if sz % (1024 ** 2) != 0:
523 msz += 1
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300524
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300525 fname = section.vals['filename']
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300526
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300527 # if already has other test with the same file name
528 # take largest size
529 files[fname] = max(files.get(fname, 0), msz)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300530
531 with ThreadPoolExecutor(len(self.config.nodes)) as pool:
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300532 fc = functools.partial(self.pre_run_th,
533 files=files,
534 force=self.force_prefill)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300535 list(pool.map(fc, self.config.nodes))
536
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300537 def pre_run_th(self, node, files, force):
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300538 # fill files with pseudo-random data
539 rossh = run_on_node(node)
540
541 try:
542 cmd = 'mkdir -p "{0}"'.format(self.config.remote_dir)
543 if self.use_sudo:
544 cmd = "sudo " + cmd
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300545 cmd += " ; sudo chown {0} {1}".format(node.get_user(),
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300546 self.config.remote_dir)
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300547 rossh(cmd, nolog=True)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300548
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300549 assert self.config.remote_dir != "" and self.config.remote_dir != "/"
550 rossh("rm -rf {0}/*".format(self.config.remote_dir), nolog=True)
551
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300552 except Exception as exc:
553 msg = "Failed to create folder {0} on remote {1}. Error: {2!s}"
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300554 msg = msg.format(self.config.remote_dir, node.get_conn_id(), exc)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300555 logger.exception(msg)
556 raise StopTestError(msg, exc)
557
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300558 self.install_utils(node, rossh)
559 self.prefill_test_files(rossh, files, force)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300560
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300561 def show_test_execution_time(self):
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300562 if len(self.fio_configs) > 1:
563 # +10% - is a rough estimation for additional operations
564 # like sftp, etc
565 exec_time = int(sum(map(execution_time, self.fio_configs)) * 1.1)
566 exec_time_s = sec_to_str(exec_time)
567 now_dt = datetime.datetime.now()
568 end_dt = now_dt + datetime.timedelta(0, exec_time)
569 msg = "Entire test should takes aroud: {0} and finished at {1}"
570 logger.info(msg.format(exec_time_s,
571 end_dt.strftime("%H:%M:%S")))
572
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300573 def run(self):
574 logger.debug("Run preparation")
575 self.pre_run()
576 self.show_test_execution_time()
577
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300578 tname = os.path.basename(self.config_fname)
579 if tname.endswith('.cfg'):
580 tname = tname[:-4]
581
582 barrier = Barrier(len(self.config.nodes))
583 results = []
584
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300585 # set of Operation_Mode_BlockSize str's
586 # which should not be tested anymore, as
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300587 # they already too slow with previous thread count
588 lat_bw_limit_reached = set()
589
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300590 with ThreadPoolExecutor(len(self.config.nodes)) as pool:
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300591 self.fio_configs.sort(key=lambda x: int(x.vals['numjobs']))
592
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300593 for pos, fio_cfg in enumerate(self.fio_configs):
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300594 test_descr = get_test_summary(fio_cfg.vals).split("th")[0]
595 if test_descr in lat_bw_limit_reached:
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300596 continue
597 else:
598 logger.info("Will run {0} test".format(fio_cfg.name))
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300599
600 templ = "Test should takes about {0}." + \
601 " Should finish at {1}," + \
602 " will wait at most till {2}"
603 exec_time = execution_time(fio_cfg)
604 exec_time_str = sec_to_str(exec_time)
605 timeout = int(exec_time + max(300, exec_time))
606
607 now_dt = datetime.datetime.now()
608 end_dt = now_dt + datetime.timedelta(0, exec_time)
609 wait_till = now_dt + datetime.timedelta(0, timeout)
610
611 logger.info(templ.format(exec_time_str,
612 end_dt.strftime("%H:%M:%S"),
613 wait_till.strftime("%H:%M:%S")))
614
615 func = functools.partial(self.do_run,
616 barrier=barrier,
617 fio_cfg=fio_cfg,
618 pos=pos)
619
620 max_retr = 3
621 for idx in range(max_retr):
622 try:
623 intervals = list(pool.map(func, self.config.nodes))
624 break
625 except (EnvironmentError, SSHException) as exc:
626 logger.exception("During fio run")
627 if idx == max_retr - 1:
628 raise StopTestError("Fio failed", exc)
629
630 logger.info("Sleeping 30s and retrying")
631 time.sleep(30)
632
633 fname = "{0}_task.fio".format(pos)
634 with open(os.path.join(self.config.log_directory, fname), "w") as fd:
635 fd.write(str(fio_cfg))
636
637 params = {'vm_count': len(self.config.nodes)}
638 params['name'] = fio_cfg.name
639 params['vals'] = dict(fio_cfg.vals.items())
640 params['intervals'] = intervals
641 params['nodes'] = [node.get_conn_id() for node in self.config.nodes]
642
643 fname = "{0}_params.yaml".format(pos)
644 with open(os.path.join(self.config.log_directory, fname), "w") as fd:
645 fd.write(dumps(params))
646
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300647 res = load_test_results(IOTestResult, self.config.log_directory, pos)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300648 results.append(res)
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300649
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300650 if self.max_latency is not None:
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300651 lat_50, _ = res.get_lat_perc_50_95_multy()
652
653 # conver us to ms
654 if self.max_latency < lat_50:
655 logger.info(("Will skip all subsequent tests of {0} " +
656 "due to lat/bw limits").format(fio_cfg.name))
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300657 lat_bw_limit_reached.add(test_descr)
658
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300659 test_res = res.get_params_from_fio_report()
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300660 if self.min_bw_per_thread is not None:
661 if self.min_bw_per_thread > average(test_res['bw']):
662 lat_bw_limit_reached.add(test_descr)
663
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300664 return results
665
666 def do_run(self, node, barrier, fio_cfg, pos, nolog=False):
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300667 if self.use_sudo:
668 sudo = "sudo "
669 else:
670 sudo = ""
671
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300672 bash_file = "#!/bin/bash\n" + \
673 "cd {exec_folder}\n" + \
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300674 "{fio_path}fio --output-format=json --output={out_file} " + \
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300675 "--alloc-size=262144 {job_file} " + \
676 " >{err_out_file} 2>&1 \n" + \
677 "echo $? >{res_code_file}\n"
678
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300679 exec_folder = self.config.remote_dir
680
681 if self.use_system_fio:
682 fio_path = ""
683 else:
684 if not exec_folder.endswith("/"):
685 fio_path = exec_folder + "/"
686 else:
687 fio_path = exec_folder
688
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300689 bash_file = bash_file.format(out_file=self.results_file,
690 job_file=self.task_file,
691 err_out_file=self.err_out_file,
692 res_code_file=self.exit_code_file,
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300693 exec_folder=exec_folder,
694 fio_path=fio_path)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300695
696 with node.connection.open_sftp() as sftp:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300697 save_to_remote(sftp, self.task_file, str(fio_cfg))
698 save_to_remote(sftp, self.sh_file, bash_file)
699
700 exec_time = execution_time(fio_cfg)
701
702 timeout = int(exec_time + max(300, exec_time))
703 soft_tout = exec_time
704
705 begin = time.time()
706
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300707 fnames_before = run_on_node(node)("ls -1 " + exec_folder, nolog=True)
708
709 barrier.wait()
710
koder aka kdanilov5414a992015-06-13 03:07:25 +0300711 task = BGSSHTask(node, self.use_sudo)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300712 task.start(sudo + "bash " + self.sh_file)
713
714 while True:
715 try:
716 task.wait(soft_tout, timeout)
717 break
718 except paramiko.SSHException:
719 pass
720
721 try:
722 node.connection.close()
723 except:
724 pass
725
726 reconnect(node.connection, node.conn_url)
727
728 end = time.time()
729 rossh = run_on_node(node)
730 fnames_after = rossh("ls -1 " + exec_folder, nolog=True)
731
732 conn_id = node.get_conn_id().replace(":", "_")
733 if not nolog:
734 logger.debug("Test on node {0} is finished".format(conn_id))
735
736 log_files_pref = []
737 if 'write_lat_log' in fio_cfg.vals:
738 fname = fio_cfg.vals['write_lat_log']
739 log_files_pref.append(fname + '_clat')
740 log_files_pref.append(fname + '_lat')
741 log_files_pref.append(fname + '_slat')
742
743 if 'write_iops_log' in fio_cfg.vals:
744 fname = fio_cfg.vals['write_iops_log']
745 log_files_pref.append(fname + '_iops')
746
747 if 'write_bw_log' in fio_cfg.vals:
748 fname = fio_cfg.vals['write_bw_log']
749 log_files_pref.append(fname + '_bw')
750
751 files = collections.defaultdict(lambda: [])
752 all_files = [os.path.basename(self.results_file)]
753 new_files = set(fnames_after.split()) - set(fnames_before.split())
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300754
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300755 for fname in new_files:
756 if fname.endswith('.log') and fname.split('.')[0] in log_files_pref:
757 name, _ = os.path.splitext(fname)
758 if fname.count('.') == 1:
759 tp = name.split("_")[-1]
760 cnt = 0
761 else:
762 tp_cnt = name.split("_")[-1]
763 tp, cnt = tp_cnt.split('.')
764 files[tp].append((int(cnt), fname))
765 all_files.append(fname)
766
767 arch_name = self.join_remote('wally_result.tar.gz')
768 tmp_dir = os.path.join(self.config.log_directory, 'tmp_' + conn_id)
769 os.mkdir(tmp_dir)
770 loc_arch_name = os.path.join(tmp_dir, 'wally_result.{0}.tar.gz'.format(conn_id))
771 file_full_names = " ".join(all_files)
772
773 try:
774 os.unlink(loc_arch_name)
775 except:
776 pass
777
778 with node.connection.open_sftp() as sftp:
779 exit_code = read_from_remote(sftp, self.exit_code_file)
780 err_out = read_from_remote(sftp, self.err_out_file)
781 exit_code = exit_code.strip()
782
783 if exit_code != '0':
784 msg = "fio exit with code {0}: {1}".format(exit_code, err_out)
785 logger.critical(msg.strip())
786 raise StopTestError("fio failed")
787
788 rossh("rm -f {0}".format(arch_name), nolog=True)
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300789 pack_files_cmd = "cd {0} ; tar zcvf {1} {2}".format(exec_folder, arch_name, file_full_names)
790 rossh(pack_files_cmd, nolog=True)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300791 sftp.get(arch_name, loc_arch_name)
792
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300793 unpack_files_cmd = "cd {0} ; tar xvzf {1} >/dev/null".format(tmp_dir, loc_arch_name)
794 subprocess.check_call(unpack_files_cmd, shell=True)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300795 os.unlink(loc_arch_name)
796
797 for ftype, fls in files.items():
798 for idx, fname in fls:
799 cname = os.path.join(tmp_dir, fname)
800 loc_fname = "{0}_{1}_{2}.{3}.log".format(pos, conn_id, ftype, idx)
801 loc_path = os.path.join(self.config.log_directory, loc_fname)
802 os.rename(cname, loc_path)
803
804 cname = os.path.join(tmp_dir,
805 os.path.basename(self.results_file))
806 loc_fname = "{0}_{1}_rawres.json".format(pos, conn_id)
807 loc_path = os.path.join(self.config.log_directory, loc_fname)
808 os.rename(cname, loc_path)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300809 os.rmdir(tmp_dir)
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300810
811 remove_remote_res_files_cmd = "cd {0} ; rm -f {1} {2}".format(exec_folder,
812 arch_name,
813 file_full_names)
814 rossh(remove_remote_res_files_cmd, nolog=True)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300815 return begin, end
816
817 @classmethod
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300818 def format_for_console(cls, results):
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300819 """
820 create a table with io performance report
821 for console
822 """
823
824 def getconc(data):
825 th_count = data.params['vals'].get('numjobs')
826
827 if th_count is None:
828 th_count = data.params['vals'].get('concurence', 1)
829 return th_count
830
831 def key_func(data):
832 p = data.params['vals']
833
834 th_count = getconc(data)
835
836 return (data.name.rsplit("_", 1)[0],
837 p['rw'],
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300838 get_test_sync_mode(data.params['vals']),
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300839 ssize2b(p['blocksize']),
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300840 int(th_count) * len(data.config.nodes))
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300841
842 tab = texttable.Texttable(max_width=120)
843 tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300844
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300845 header = ["Name", "Description", "iops\ncum", "KiBps\ncum",
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300846 "Cnf\n95%", "Dev%", "iops\nper vm", "KiBps\nper vm", "lat ms\nmedian"]
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300847 tab.set_cols_align(["l", "l"] + ['r'] * (len(header) - 2))
848 sep = ["-------", "-----------"] + ["---"] * (len(header) - 2)
849 tab.header(header)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300850
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300851 prev_k = None
852 for item in sorted(results, key=key_func):
853 curr_k = key_func(item)[:4]
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300854 if prev_k is not None:
855 if prev_k != curr_k:
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300856 tab.add_row(sep)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300857
858 prev_k = curr_k
859
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300860 test_dinfo = item.disk_perf_info()
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300861
862 iops, _ = test_dinfo.iops.rounded_average_conf()
863
864 bw, bw_conf = test_dinfo.bw.rounded_average_conf()
865 _, bw_dev = test_dinfo.bw.rounded_average_dev()
866 conf_perc = int(round(bw_conf * 100 / bw))
867 dev_perc = int(round(bw_dev * 100 / bw))
868
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300869 lat = round_3_digit(int(test_dinfo.lat))
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300870
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300871 testnodes_count = len(item.config.nodes)
872 iops_per_vm = round_3_digit(iops / testnodes_count)
873 bw_per_vm = round_3_digit(bw / testnodes_count)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300874
875 iops = round_3_digit(iops)
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300876 # iops_from_lat = round_3_digit(iops_from_lat)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300877 bw = round_3_digit(bw)
878
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300879 params = (item.name.rsplit('_', 1)[0],
880 item.summary(),
881 int(iops),
882 int(bw),
883 str(conf_perc),
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300884 str(dev_perc),
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300885 int(iops_per_vm),
886 int(bw_per_vm),
887 lat)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300888 tab.add_row(params)
889
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300890 return tab.draw()