blob: b85bbb1dace85b84337d4bd7418479249860f139 [file] [log] [blame]
koder aka kdanilovbc2c8982015-06-13 02:50:43 +03001import re
2import time
3import json
koder aka kdanilov6ab4d432015-06-22 00:26:28 +03004import random
koder aka kdanilovbc2c8982015-06-13 02:50:43 +03005import os.path
6import logging
7import datetime
8import functools
9import subprocess
10import collections
11
12import yaml
13import paramiko
14import texttable
15from paramiko.ssh_exception import SSHException
16from concurrent.futures import ThreadPoolExecutor
17
koder aka kdanilov6ab4d432015-06-22 00:26:28 +030018import wally
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030019from wally.pretty_yaml import dumps
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +030020from wally.statistic import round_3_digit, data_property, average
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030021from wally.utils import ssize2b, sec_to_str, StopTestError, Barrier, get_os
22from wally.ssh_utils import (save_to_remote, read_from_remote, BGSSHTask, reconnect)
23
24from .fio_task_parser import (execution_time, fio_cfg_compile,
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +030025 get_test_summary, get_test_sync_mode, FioJobSection)
26from ..itest import (TimeSeriesValue, PerfTest, TestResults,
27 run_on_node, TestConfig, MeasurementMatrix)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030028
29logger = logging.getLogger("wally")
30
31
32# Results folder structure
33# results/
34# {loadtype}_{num}/
35# config.yaml
36# ......
37
38
39class NoData(object):
40 pass
41
42
43def cached_prop(func):
44 @property
45 @functools.wraps(func)
46 def closure(self):
47 val = getattr(self, "_" + func.__name__)
48 if val is NoData:
49 val = func(self)
50 setattr(self, "_" + func.__name__, val)
51 return val
52 return closure
53
54
55def load_fio_log_file(fname):
56 with open(fname) as fd:
57 it = [ln.split(',')[:2] for ln in fd]
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +030058
59 vals = [(float(off) / 1000, # convert us to ms
60 float(val.strip()) + 0.5) # add 0.5 to compemsate average value
61 # as fio trimm all values in log to integer
62 for off, val in it]
63
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030064 return TimeSeriesValue(vals)
65
66
67def load_test_results(cls, folder, run_num):
68 res = {}
69 params = None
70
71 fn = os.path.join(folder, str(run_num) + '_params.yaml')
72 params = yaml.load(open(fn).read())
73
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +030074 conn_ids_set = set()
75 rr = r"{0}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.\d+\.log$".format(run_num)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030076 for fname in os.listdir(folder):
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030077 rm = re.match(rr, fname)
78 if rm is None:
79 continue
80
81 conn_id_s = rm.group('conn_id')
82 conn_id = conn_id_s.replace('_', ':')
83 ftype = rm.group('type')
84
85 if ftype not in ('iops', 'bw', 'lat'):
86 continue
87
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +030088 ts = load_fio_log_file(os.path.join(folder, fname))
89 res.setdefault(ftype, {}).setdefault(conn_id, []).append(ts)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030090
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +030091 conn_ids_set.add(conn_id)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030092
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +030093 mm_res = {}
94
95 for key, data in res.items():
96 conn_ids = sorted(conn_ids_set)
97 matr = [data[conn_id] for conn_id in conn_ids]
98
99 mm_res[key] = MeasurementMatrix(matr, conn_ids)
100
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300101 # iops_from_lat_matr = []
102 # for node_ts in mm_res['lat'].data:
103 # iops_from_lat_matr.append([])
104 # for thread_ts in node_ts:
105 # ndt = [(start + ln, 1000000. / val)
106 # for (start, ln, val) in thread_ts.data]
107 # new_ts = TimeSeriesValue(ndt)
108 # iops_from_lat_matr[-1].append(new_ts)
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300109
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300110 # mm_res['iops_from_lat'] = MeasurementMatrix(iops_from_lat_matr, conn_ids)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300111
112 raw_res = {}
113 for conn_id in conn_ids:
114 fn = os.path.join(folder, "{0}_{1}_rawres.json".format(run_num, conn_id_s))
115
116 # remove message hack
117 fc = "{" + open(fn).read().split('{', 1)[1]
118 raw_res[conn_id] = json.loads(fc)
119
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300120 fio_task = FioJobSection(params['name'])
121 fio_task.vals.update(params['vals'])
122
123 config = TestConfig('io', params, None, params['nodes'], folder, None)
124 return cls(config, fio_task, mm_res, raw_res, params['intervals'])
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300125
126
127class Attrmapper(object):
128 def __init__(self, dct):
129 self.__dct = dct
130
131 def __getattr__(self, name):
132 try:
133 return self.__dct[name]
134 except KeyError:
135 raise AttributeError(name)
136
137
138class DiskPerfInfo(object):
139 def __init__(self, name, summary, params, testnodes_count):
140 self.name = name
141 self.bw = None
142 self.iops = None
143 self.lat = None
144 self.lat_50 = None
145 self.lat_95 = None
146
147 self.raw_bw = []
148 self.raw_iops = []
149 self.raw_lat = []
150
151 self.params = params
152 self.testnodes_count = testnodes_count
153 self.summary = summary
154 self.p = Attrmapper(self.params['vals'])
155
156 self.sync_mode = get_test_sync_mode(self.params['vals'])
157 self.concurence = self.params['vals'].get('numjobs', 1)
158
159
160def get_lat_perc_50_95(lat_mks):
161 curr_perc = 0
162 perc_50 = None
163 perc_95 = None
164 pkey = None
165 for key, val in sorted(lat_mks.items()):
166 if curr_perc + val >= 50 and perc_50 is None:
167 if pkey is None or val < 1.:
168 perc_50 = key
169 else:
170 perc_50 = (50. - curr_perc) / val * (key - pkey) + pkey
171
172 if curr_perc + val >= 95:
173 if pkey is None or val < 1.:
174 perc_95 = key
175 else:
176 perc_95 = (95. - curr_perc) / val * (key - pkey) + pkey
177 break
178
179 pkey = key
180 curr_perc += val
181
182 return perc_50 / 1000., perc_95 / 1000.
183
184
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300185class IOTestResult(TestResults):
186 """
187 Fio run results
188 config: TestConfig
189 fio_task: FioJobSection
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300190 ts_results: {str: MeasurementMatrix[TimeSeriesValue]}
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300191 raw_result: ????
192 run_interval:(float, float) - test tun time, used for sensors
193 """
194 def __init__(self, config, fio_task, ts_results, raw_result, run_interval):
195
196 self.name = fio_task.name.split("_")[0]
197 self.fio_task = fio_task
198
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300199 self.bw = ts_results.get('bw')
200 self.lat = ts_results.get('lat')
201 self.iops = ts_results.get('iops')
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300202 # self.iops_from_lat = ts_results.get('iops_from_lat')
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300203
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300204 # self.slat = drop_warmup(res.get('clat', None), self.params)
205 # self.clat = drop_warmup(res.get('slat', None), self.params)
206
207 res = {"bw": self.bw, "lat": self.lat, "iops": self.iops}
208
209 self.sensors_data = None
210 self._pinfo = None
211 TestResults.__init__(self, config, res, raw_result, run_interval)
212
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300213 def get_params_from_fio_report(self):
214 nodes = self.bw.connections_ids
215
216 iops = [self.raw_result[node]['jobs'][0]['mixed']['iops'] for node in nodes]
217 total_ios = [self.raw_result[node]['jobs'][0]['mixed']['total_ios'] for node in nodes]
218 runtime = [self.raw_result[node]['jobs'][0]['mixed']['runtime'] / 1000 for node in nodes]
219 flt_iops = [float(ios) / rtime for ios, rtime in zip(total_ios, runtime)]
220
221 bw = [self.raw_result[node]['jobs'][0]['mixed']['bw'] for node in nodes]
222 total_bytes = [self.raw_result[node]['jobs'][0]['mixed']['io_bytes'] for node in nodes]
223 flt_bw = [float(tbytes) / rtime for tbytes, rtime in zip(total_bytes, runtime)]
224
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300225 return {'iops': iops,
226 'flt_iops': flt_iops,
227 'bw': bw,
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300228 'flt_bw': flt_bw}
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300229
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300230 def summary(self):
231 return get_test_summary(self.fio_task) + "vm" \
232 + str(len(self.config.nodes))
233
234 def get_yamable(self):
235 return self.summary()
236
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300237 def get_lat_perc_50_95_multy(self):
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300238 lat_mks = collections.defaultdict(lambda: 0)
239 num_res = 0
240
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300241 for result in self.raw_result.values():
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300242 num_res += len(result['jobs'])
243 for job_info in result['jobs']:
244 for k, v in job_info['latency_ms'].items():
245 if isinstance(k, basestring) and k.startswith('>='):
246 lat_mks[int(k[2:]) * 1000] += v
247 else:
248 lat_mks[int(k) * 1000] += v
249
250 for k, v in job_info['latency_us'].items():
251 lat_mks[int(k)] += v
252
253 for k, v in lat_mks.items():
254 lat_mks[k] = float(v) / num_res
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300255 return get_lat_perc_50_95(lat_mks)
256
257 def disk_perf_info(self, avg_interval=2.0):
258
259 if self._pinfo is not None:
260 return self._pinfo
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300261
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300262 testnodes_count = len(self.config.nodes)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300263
264 pinfo = DiskPerfInfo(self.name,
265 self.summary(),
266 self.params,
267 testnodes_count)
268
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300269 # ramp_time = self.fio_task.vals.get('ramp_time', 0)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300270
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300271 def prepare(data, drop=1):
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300272 if data is None:
273 return data
274
275 res = []
276 for ts_data in data:
277 # if ramp_time > 0:
278 # ts_data = ts_data.skip(ramp_time)
279
280 if ts_data.average_interval() < avg_interval:
281 ts_data = ts_data.derived(avg_interval)
282
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300283 # drop last value on bounds
284 # as they may contains ranges without activities
285 assert len(ts_data.values) >= drop + 1
286
287 if drop > 0:
288 res.append(ts_data.values[:-drop])
289 else:
290 res.append(ts_data.values)
291
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300292 return res
293
294 def agg_data(matr):
295 arr = sum(matr, [])
296 min_len = min(map(len, arr))
297 res = []
298 for idx in range(min_len):
299 res.append(sum(dt[idx] for dt in arr))
300 return res
301
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300302 # pinfo.raw_lat = map(prepare, self.lat.per_vm())
303 # num_th = sum(map(len, pinfo.raw_lat))
304 # avg_lat = [val / num_th for val in agg_data(pinfo.raw_lat)]
305 # pinfo.lat = data_property(avg_lat)
306 pinfo.lat_50, pinfo.lat_95 = self.get_lat_perc_50_95_multy()
307 pinfo.lat = pinfo.lat_50
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300308
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300309 pinfo.raw_bw = map(prepare, self.bw.per_vm())
310 pinfo.raw_iops = map(prepare, self.iops.per_vm())
311
312 iops_per_th = sum(sum(pinfo.raw_iops, []), [])
313
314 fparams = self.get_params_from_fio_report()
315 fio_report_bw = sum(fparams['flt_bw'])
316 fio_report_iops = sum(fparams['flt_iops'])
317
318 agg_bw = agg_data(pinfo.raw_bw)
319 agg_iops = agg_data(pinfo.raw_iops)
320
321 log_bw_avg = average(agg_bw)
322 log_iops_avg = average(agg_iops)
323
324 # update values to match average from fio report
325 coef_iops = fio_report_iops / float(log_iops_avg)
326 coef_bw = fio_report_bw / float(log_bw_avg)
327
328 bw_log = data_property([val * coef_bw for val in agg_bw])
329 iops_log = data_property([val * coef_iops for val in agg_iops])
330
331 bw_report = data_property([fio_report_bw])
332 iops_report = data_property([fio_report_iops])
333
334 # When IOPS/BW per thread is too low
335 # data from logs is rounded to match
336 if average(iops_per_th) > 10:
337 pinfo.bw = bw_log
338 pinfo.iops = iops_log
339 pinfo.bw2 = bw_report
340 pinfo.iops2 = iops_report
341 else:
342 pinfo.bw = bw_report
343 pinfo.iops = iops_report
344 pinfo.bw2 = bw_log
345 pinfo.iops2 = iops_log
346
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300347 self._pinfo = pinfo
348
349 return pinfo
350
351
352class IOPerfTest(PerfTest):
353 tcp_conn_timeout = 30
354 max_pig_timeout = 5
355 soft_runcycle = 5 * 60
356
357 def __init__(self, config):
358 PerfTest.__init__(self, config)
359
360 get = self.config.params.get
361 do_get = self.config.params.__getitem__
362
363 self.config_fname = do_get('cfg')
364
365 if '/' not in self.config_fname and '.' not in self.config_fname:
366 cfgs_dir = os.path.dirname(__file__)
367 self.config_fname = os.path.join(cfgs_dir,
368 self.config_fname + '.cfg')
369
370 self.alive_check_interval = get('alive_check_interval')
371 self.use_system_fio = get('use_system_fio', False)
372
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300373 if get('prefill_files') is not None:
374 logger.warning("prefill_files option is depricated. Use force_prefill instead")
375
376 self.force_prefill = get('force_prefill', False)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300377 self.config_params = get('params', {}).copy()
378
379 self.io_py_remote = self.join_remote("agent.py")
380 self.results_file = self.join_remote("results.json")
381 self.pid_file = self.join_remote("pid")
382 self.task_file = self.join_remote("task.cfg")
383 self.sh_file = self.join_remote("cmd.sh")
384 self.err_out_file = self.join_remote("fio_err_out")
385 self.exit_code_file = self.join_remote("exit_code")
386
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300387 self.max_latency = get("max_lat", None)
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300388 self.min_bw_per_thread = get("min_bw", None)
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300389
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300390 self.use_sudo = get("use_sudo", True)
391 self.test_logging = get("test_logging", False)
392
393 self.raw_cfg = open(self.config_fname).read()
394 self.fio_configs = fio_cfg_compile(self.raw_cfg,
395 self.config_fname,
396 self.config_params,
397 split_on_names=self.test_logging)
398 self.fio_configs = list(self.fio_configs)
399
400 @classmethod
401 def load(cls, folder):
402 for fname in os.listdir(folder):
403 if re.match("\d+_params.yaml$", fname):
404 num = int(fname.split('_')[0])
405 yield load_test_results(IOTestResult, folder, num)
406
407 def cleanup(self):
408 # delete_file(conn, self.io_py_remote)
409 # Need to remove tempo files, used for testing
410 pass
411
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300412 def check_prefill_required(self, rossh, fname, size, num_blocks=16):
koder aka kdanilovf95cfc12015-06-23 03:33:19 +0300413 try:
414 data = rossh("ls -l " + fname, nolog=True)
415 except:
416 return True
417
418 sz = data.split()[4]
419 if int(sz) / (1024 ** 2) < size:
420 return True
421
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300422 cmd = """python -c "import sys; fd = open('{0}', 'rb');""" + \
koder aka kdanilovf95cfc12015-06-23 03:33:19 +0300423 """fd.seek({1}); sys.stdout.write(fd.read(1024))" | md5sum"""
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300424
425 if self.use_sudo:
426 cmd = "sudo " + cmd
427
koder aka kdanilovf95cfc12015-06-23 03:33:19 +0300428 zero_md5 = '0f343b0931126a20f133d67c2b018a3b'
429 offsets = [random.randrange(size * 1024) for _ in range(num_blocks)]
430 offsets.append(size * 1024 - 1024)
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300431
koder aka kdanilovf95cfc12015-06-23 03:33:19 +0300432 for offset in offsets:
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300433 data = rossh(cmd.format(fname, offset), nolog=True)
koder aka kdanilovf95cfc12015-06-23 03:33:19 +0300434 md = data.split()[0].strip()
435
436 if len(md) != 32:
437 logger.error("File data check is failed - " + data)
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300438 return True
koder aka kdanilovf95cfc12015-06-23 03:33:19 +0300439
440 if zero_md5 == md:
441 return True
442
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300443 return False
444
445 def prefill_test_files(self, rossh, files, force=False):
446 if self.use_system_fio:
447 cmd_templ = "fio "
448 else:
449 cmd_templ = "{0}/fio ".format(self.config.remote_dir)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300450
451 if self.use_sudo:
452 cmd_templ = "sudo " + cmd_templ
453
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300454 cmd_templ += "--name=xxx --filename={0} --direct=1" + \
455 " --bs=4m --size={1}m --rw=write"
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300456
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300457 ssize = 0
458
459 if force:
460 logger.info("File prefilling is forced")
461
462 ddtime = 0
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300463 for fname, curr_sz in files.items():
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300464 if not force:
465 if not self.check_prefill_required(rossh, fname, curr_sz):
koder aka kdanilovf95cfc12015-06-23 03:33:19 +0300466 print "prefill is skipped"
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300467 continue
468
469 logger.info("Prefilling file {0}".format(fname))
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300470 cmd = cmd_templ.format(fname, curr_sz)
471 ssize += curr_sz
472
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300473 stime = time.time()
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300474 rossh(cmd, timeout=curr_sz)
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300475 ddtime += time.time() - stime
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300476
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300477 if ddtime > 1.0:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300478 fill_bw = int(ssize / ddtime)
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300479 mess = "Initiall fio fill bw is {0} MiBps for this vm"
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300480 logger.info(mess.format(fill_bw))
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300481
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300482 def install_utils(self, node, rossh, max_retry=3, timeout=5):
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300483 need_install = []
484 packs = [('screen', 'screen')]
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300485 os_info = get_os(rossh)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300486
487 if self.use_system_fio:
488 packs.append(('fio', 'fio'))
489 else:
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300490 packs.append(('bzip2', 'bzip2'))
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300491
492 for bin_name, package in packs:
493 if bin_name is None:
494 need_install.append(package)
495 continue
496
497 try:
498 rossh('which ' + bin_name, nolog=True)
499 except OSError:
500 need_install.append(package)
501
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300502 if len(need_install) != 0:
503 if 'redhat' == os_info.distro:
504 cmd = "sudo yum -y install " + " ".join(need_install)
505 else:
506 cmd = "sudo apt-get -y install " + " ".join(need_install)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300507
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300508 for _ in range(max_retry):
509 try:
510 rossh(cmd)
511 break
512 except OSError as err:
513 time.sleep(timeout)
514 else:
515 raise OSError("Can't install - " + str(err))
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300516
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300517 if not self.use_system_fio:
518 fio_dir = os.path.dirname(os.path.dirname(wally.__file__))
519 fio_dir = os.path.join(os.getcwd(), fio_dir)
520 fio_dir = os.path.join(fio_dir, 'fio_binaries')
521 fname = 'fio_{0.release}_{0.arch}.bz2'.format(os_info)
522 fio_path = os.path.join(fio_dir, fname)
523
524 if not os.path.exists(fio_path):
525 raise RuntimeError("No prebuild fio available for {0}".format(os_info))
526
527 bz_dest = self.join_remote('fio.bz2')
528 with node.connection.open_sftp() as sftp:
529 sftp.put(fio_path, bz_dest)
530
531 rossh("bzip2 --decompress " + bz_dest, nolog=True)
532 rossh("chmod a+x " + self.join_remote("fio"), nolog=True)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300533
534 def pre_run(self):
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300535 files = {}
536 for section in self.fio_configs:
537 sz = ssize2b(section.vals['size'])
538 msz = sz / (1024 ** 2)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300539
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300540 if sz % (1024 ** 2) != 0:
541 msz += 1
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300542
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300543 fname = section.vals['filename']
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300544
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300545 # if already has other test with the same file name
546 # take largest size
547 files[fname] = max(files.get(fname, 0), msz)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300548
549 with ThreadPoolExecutor(len(self.config.nodes)) as pool:
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300550 fc = functools.partial(self.pre_run_th,
551 files=files,
552 force=self.force_prefill)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300553 list(pool.map(fc, self.config.nodes))
554
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300555 def pre_run_th(self, node, files, force):
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300556 # fill files with pseudo-random data
557 rossh = run_on_node(node)
558
559 try:
560 cmd = 'mkdir -p "{0}"'.format(self.config.remote_dir)
561 if self.use_sudo:
562 cmd = "sudo " + cmd
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300563 cmd += " ; sudo chown {0} {1}".format(node.get_user(),
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300564 self.config.remote_dir)
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300565 rossh(cmd, nolog=True)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300566
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300567 assert self.config.remote_dir != "" and self.config.remote_dir != "/"
568 rossh("rm -rf {0}/*".format(self.config.remote_dir), nolog=True)
569
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300570 except Exception as exc:
571 msg = "Failed to create folder {0} on remote {1}. Error: {2!s}"
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300572 msg = msg.format(self.config.remote_dir, node.get_conn_id(), exc)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300573 logger.exception(msg)
574 raise StopTestError(msg, exc)
575
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300576 self.install_utils(node, rossh)
577 self.prefill_test_files(rossh, files, force)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300578
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300579 def show_test_execution_time(self):
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300580 if len(self.fio_configs) > 1:
581 # +10% - is a rough estimation for additional operations
582 # like sftp, etc
583 exec_time = int(sum(map(execution_time, self.fio_configs)) * 1.1)
584 exec_time_s = sec_to_str(exec_time)
585 now_dt = datetime.datetime.now()
586 end_dt = now_dt + datetime.timedelta(0, exec_time)
587 msg = "Entire test should takes aroud: {0} and finished at {1}"
588 logger.info(msg.format(exec_time_s,
589 end_dt.strftime("%H:%M:%S")))
590
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300591 def run(self):
592 logger.debug("Run preparation")
593 self.pre_run()
594 self.show_test_execution_time()
595
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300596 tname = os.path.basename(self.config_fname)
597 if tname.endswith('.cfg'):
598 tname = tname[:-4]
599
600 barrier = Barrier(len(self.config.nodes))
601 results = []
602
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300603 # set of Operation_Mode_BlockSize str's
604 # which should not be tested anymore, as
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300605 # they already too slow with previous thread count
606 lat_bw_limit_reached = set()
607
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300608 with ThreadPoolExecutor(len(self.config.nodes)) as pool:
koder aka kdanilov6b872662015-06-23 01:58:36 +0300609 self.fio_configs.sort(key=lambda x: int(x.vals.get('numjobs', 1)))
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300610
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300611 for pos, fio_cfg in enumerate(self.fio_configs):
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300612 test_descr = get_test_summary(fio_cfg.vals).split("th")[0]
613 if test_descr in lat_bw_limit_reached:
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300614 continue
615 else:
616 logger.info("Will run {0} test".format(fio_cfg.name))
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300617
618 templ = "Test should takes about {0}." + \
619 " Should finish at {1}," + \
620 " will wait at most till {2}"
621 exec_time = execution_time(fio_cfg)
622 exec_time_str = sec_to_str(exec_time)
623 timeout = int(exec_time + max(300, exec_time))
624
625 now_dt = datetime.datetime.now()
626 end_dt = now_dt + datetime.timedelta(0, exec_time)
627 wait_till = now_dt + datetime.timedelta(0, timeout)
628
629 logger.info(templ.format(exec_time_str,
630 end_dt.strftime("%H:%M:%S"),
631 wait_till.strftime("%H:%M:%S")))
632
633 func = functools.partial(self.do_run,
634 barrier=barrier,
635 fio_cfg=fio_cfg,
636 pos=pos)
637
638 max_retr = 3
639 for idx in range(max_retr):
640 try:
641 intervals = list(pool.map(func, self.config.nodes))
642 break
643 except (EnvironmentError, SSHException) as exc:
644 logger.exception("During fio run")
645 if idx == max_retr - 1:
646 raise StopTestError("Fio failed", exc)
647
648 logger.info("Sleeping 30s and retrying")
649 time.sleep(30)
650
651 fname = "{0}_task.fio".format(pos)
652 with open(os.path.join(self.config.log_directory, fname), "w") as fd:
653 fd.write(str(fio_cfg))
654
655 params = {'vm_count': len(self.config.nodes)}
656 params['name'] = fio_cfg.name
657 params['vals'] = dict(fio_cfg.vals.items())
658 params['intervals'] = intervals
659 params['nodes'] = [node.get_conn_id() for node in self.config.nodes]
660
661 fname = "{0}_params.yaml".format(pos)
662 with open(os.path.join(self.config.log_directory, fname), "w") as fd:
663 fd.write(dumps(params))
664
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300665 res = load_test_results(IOTestResult, self.config.log_directory, pos)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300666 results.append(res)
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300667
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300668 if self.max_latency is not None:
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300669 lat_50, _ = res.get_lat_perc_50_95_multy()
670
671 # conver us to ms
672 if self.max_latency < lat_50:
673 logger.info(("Will skip all subsequent tests of {0} " +
674 "due to lat/bw limits").format(fio_cfg.name))
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300675 lat_bw_limit_reached.add(test_descr)
676
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300677 test_res = res.get_params_from_fio_report()
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300678 if self.min_bw_per_thread is not None:
679 if self.min_bw_per_thread > average(test_res['bw']):
680 lat_bw_limit_reached.add(test_descr)
681
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300682 return results
683
684 def do_run(self, node, barrier, fio_cfg, pos, nolog=False):
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300685 if self.use_sudo:
686 sudo = "sudo "
687 else:
688 sudo = ""
689
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300690 bash_file = "#!/bin/bash\n" + \
691 "cd {exec_folder}\n" + \
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300692 "{fio_path}fio --output-format=json --output={out_file} " + \
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300693 "--alloc-size=262144 {job_file} " + \
694 " >{err_out_file} 2>&1 \n" + \
695 "echo $? >{res_code_file}\n"
696
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300697 exec_folder = self.config.remote_dir
698
699 if self.use_system_fio:
700 fio_path = ""
701 else:
702 if not exec_folder.endswith("/"):
703 fio_path = exec_folder + "/"
704 else:
705 fio_path = exec_folder
706
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300707 bash_file = bash_file.format(out_file=self.results_file,
708 job_file=self.task_file,
709 err_out_file=self.err_out_file,
710 res_code_file=self.exit_code_file,
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300711 exec_folder=exec_folder,
712 fio_path=fio_path)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300713
714 with node.connection.open_sftp() as sftp:
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300715 save_to_remote(sftp, self.task_file, str(fio_cfg))
716 save_to_remote(sftp, self.sh_file, bash_file)
717
718 exec_time = execution_time(fio_cfg)
719
720 timeout = int(exec_time + max(300, exec_time))
721 soft_tout = exec_time
722
723 begin = time.time()
724
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300725 fnames_before = run_on_node(node)("ls -1 " + exec_folder, nolog=True)
726
727 barrier.wait()
728
koder aka kdanilov5414a992015-06-13 03:07:25 +0300729 task = BGSSHTask(node, self.use_sudo)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300730 task.start(sudo + "bash " + self.sh_file)
731
732 while True:
733 try:
734 task.wait(soft_tout, timeout)
735 break
736 except paramiko.SSHException:
737 pass
738
739 try:
740 node.connection.close()
741 except:
742 pass
743
744 reconnect(node.connection, node.conn_url)
745
746 end = time.time()
747 rossh = run_on_node(node)
748 fnames_after = rossh("ls -1 " + exec_folder, nolog=True)
749
750 conn_id = node.get_conn_id().replace(":", "_")
751 if not nolog:
752 logger.debug("Test on node {0} is finished".format(conn_id))
753
754 log_files_pref = []
755 if 'write_lat_log' in fio_cfg.vals:
756 fname = fio_cfg.vals['write_lat_log']
757 log_files_pref.append(fname + '_clat')
758 log_files_pref.append(fname + '_lat')
759 log_files_pref.append(fname + '_slat')
760
761 if 'write_iops_log' in fio_cfg.vals:
762 fname = fio_cfg.vals['write_iops_log']
763 log_files_pref.append(fname + '_iops')
764
765 if 'write_bw_log' in fio_cfg.vals:
766 fname = fio_cfg.vals['write_bw_log']
767 log_files_pref.append(fname + '_bw')
768
769 files = collections.defaultdict(lambda: [])
770 all_files = [os.path.basename(self.results_file)]
771 new_files = set(fnames_after.split()) - set(fnames_before.split())
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300772
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300773 for fname in new_files:
774 if fname.endswith('.log') and fname.split('.')[0] in log_files_pref:
775 name, _ = os.path.splitext(fname)
776 if fname.count('.') == 1:
777 tp = name.split("_")[-1]
778 cnt = 0
779 else:
780 tp_cnt = name.split("_")[-1]
781 tp, cnt = tp_cnt.split('.')
782 files[tp].append((int(cnt), fname))
783 all_files.append(fname)
784
785 arch_name = self.join_remote('wally_result.tar.gz')
786 tmp_dir = os.path.join(self.config.log_directory, 'tmp_' + conn_id)
787 os.mkdir(tmp_dir)
788 loc_arch_name = os.path.join(tmp_dir, 'wally_result.{0}.tar.gz'.format(conn_id))
789 file_full_names = " ".join(all_files)
790
791 try:
792 os.unlink(loc_arch_name)
793 except:
794 pass
795
796 with node.connection.open_sftp() as sftp:
797 exit_code = read_from_remote(sftp, self.exit_code_file)
798 err_out = read_from_remote(sftp, self.err_out_file)
799 exit_code = exit_code.strip()
800
801 if exit_code != '0':
802 msg = "fio exit with code {0}: {1}".format(exit_code, err_out)
803 logger.critical(msg.strip())
804 raise StopTestError("fio failed")
805
806 rossh("rm -f {0}".format(arch_name), nolog=True)
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300807 pack_files_cmd = "cd {0} ; tar zcvf {1} {2}".format(exec_folder, arch_name, file_full_names)
808 rossh(pack_files_cmd, nolog=True)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300809 sftp.get(arch_name, loc_arch_name)
810
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300811 unpack_files_cmd = "cd {0} ; tar xvzf {1} >/dev/null".format(tmp_dir, loc_arch_name)
812 subprocess.check_call(unpack_files_cmd, shell=True)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300813 os.unlink(loc_arch_name)
814
815 for ftype, fls in files.items():
816 for idx, fname in fls:
817 cname = os.path.join(tmp_dir, fname)
818 loc_fname = "{0}_{1}_{2}.{3}.log".format(pos, conn_id, ftype, idx)
819 loc_path = os.path.join(self.config.log_directory, loc_fname)
820 os.rename(cname, loc_path)
821
822 cname = os.path.join(tmp_dir,
823 os.path.basename(self.results_file))
824 loc_fname = "{0}_{1}_rawres.json".format(pos, conn_id)
825 loc_path = os.path.join(self.config.log_directory, loc_fname)
826 os.rename(cname, loc_path)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300827 os.rmdir(tmp_dir)
koder aka kdanilov6ab4d432015-06-22 00:26:28 +0300828
829 remove_remote_res_files_cmd = "cd {0} ; rm -f {1} {2}".format(exec_folder,
830 arch_name,
831 file_full_names)
832 rossh(remove_remote_res_files_cmd, nolog=True)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300833 return begin, end
834
835 @classmethod
koder aka kdanilov6b872662015-06-23 01:58:36 +0300836 def prepare_data(cls, results):
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300837 """
838 create a table with io performance report
839 for console
840 """
841
842 def getconc(data):
843 th_count = data.params['vals'].get('numjobs')
844
845 if th_count is None:
846 th_count = data.params['vals'].get('concurence', 1)
847 return th_count
848
849 def key_func(data):
850 p = data.params['vals']
851
852 th_count = getconc(data)
853
854 return (data.name.rsplit("_", 1)[0],
855 p['rw'],
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300856 get_test_sync_mode(data.params['vals']),
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300857 ssize2b(p['blocksize']),
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300858 int(th_count) * len(data.config.nodes))
koder aka kdanilov6b872662015-06-23 01:58:36 +0300859 res = []
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300860
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300861 for item in sorted(results, key=key_func):
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300862 test_dinfo = item.disk_perf_info()
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300863
864 iops, _ = test_dinfo.iops.rounded_average_conf()
865
866 bw, bw_conf = test_dinfo.bw.rounded_average_conf()
867 _, bw_dev = test_dinfo.bw.rounded_average_dev()
868 conf_perc = int(round(bw_conf * 100 / bw))
869 dev_perc = int(round(bw_dev * 100 / bw))
870
koder aka kdanilov6b872662015-06-23 01:58:36 +0300871 lat_50 = round_3_digit(int(test_dinfo.lat_50))
872 lat_95 = round_3_digit(int(test_dinfo.lat_95))
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300873
koder aka kdanilovbb6d6cd2015-06-20 02:55:07 +0300874 testnodes_count = len(item.config.nodes)
875 iops_per_vm = round_3_digit(iops / testnodes_count)
876 bw_per_vm = round_3_digit(bw / testnodes_count)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300877
878 iops = round_3_digit(iops)
879 bw = round_3_digit(bw)
880
koder aka kdanilov6b872662015-06-23 01:58:36 +0300881 res.append({"name": item.name.rsplit('_', 1)[0],
882 "key": key_func(item),
883 "summ": item.summary()[3:],
884 "iops": int(iops),
885 "bw": int(bw),
886 "iops_conf": str(conf_perc),
887 "iops_dev": str(dev_perc),
888 "iops_per_vm": int(iops_per_vm),
889 "bw_per_vm": int(bw_per_vm),
890 "lat_50": lat_50,
891 "lat_95": lat_95})
koder aka kdanilovbc2c8982015-06-13 02:50:43 +0300892
koder aka kdanilov6b872662015-06-23 01:58:36 +0300893 return res
894
895 Field = collections.namedtuple("Field", ("header", "attr", "allign", "size"))
896 fiels_and_header = [
897 Field("Name", "name", "l", 7),
898 Field("Description", "summ", "l", 10),
899 Field("IOPS\ncum", "iops", "r", 3),
900 Field("KiBps\ncum", "bw", "r", 3),
901 Field("Cnf %\n95%", "iops_conf", "r", 3),
902 Field("Dev%", "iops_dev", "r", 3),
903 Field("iops\nper vm", "iops_per_vm", "r", 3),
904 Field("KiBps\nper vm", "bw_per_vm", "r", 3),
905 Field("lat ms\nmedian", "lat_50", "r", 3),
906 Field("lat ms\n95%", "lat_95", "r", 3)
907 ]
908
909 fiels_and_header_dct = dict((item.attr, item) for item in fiels_and_header)
910
911 @classmethod
912 def format_for_console(cls, results):
913 """
914 create a table with io performance report
915 for console
916 """
917
918 tab = texttable.Texttable(max_width=120)
919 tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
920 tab.set_cols_align([f.allign for f in cls.fiels_and_header])
921 sep = ["-" * f.size for f in cls.fiels_and_header]
922 tab.header([f.header for f in cls.fiels_and_header])
923
924 prev_k = None
925 for item in cls.prepare_data(results):
926 curr_k = item['summ'][:4]
927 if prev_k is not None:
928 if prev_k != curr_k:
929 tab.add_row(sep)
930
931 prev_k = curr_k
932 tab.add_row([item[f.attr] for f in cls.fiels_and_header])
933
934 return tab.draw()
935
936 @classmethod
937 def format_diff_for_console(cls, list_of_results):
938 """
939 create a table with io performance report
940 for console
941 """
942
943 tab = texttable.Texttable(max_width=200)
944 tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
945
946 header = [
947 cls.fiels_and_header_dct["name"].header,
948 cls.fiels_and_header_dct["summ"].header,
949 ]
950 allign = ["l", "l"]
951
952 header.append("IOPS ~ Cnf% ~ Dev%")
953 allign.extend(["r"] * len(list_of_results))
954 header.extend(
955 "IOPS_{0} %".format(i + 2) for i in range(len(list_of_results[1:]))
956 )
957
958 header.append("BW")
959 allign.extend(["r"] * len(list_of_results))
960 header.extend(
961 "BW_{0} %".format(i + 2) for i in range(len(list_of_results[1:]))
962 )
963
964 header.append("LAT")
965 allign.extend(["r"] * len(list_of_results))
966 header.extend(
967 "LAT_{0}".format(i + 2) for i in range(len(list_of_results[1:]))
968 )
969
970 tab.header(header)
971 sep = ["-" * 3] * len(header)
972 processed_results = map(cls.prepare_data, list_of_results)
973
974 key2results = []
975 for res in processed_results:
976 key2results.append(dict(
977 ((item["name"], item["summ"]), item) for item in res
978 ))
979
980 prev_k = None
981 iops_frmt = "{0[iops]} ~ {0[iops_conf]:>2} ~ {0[iops_dev]:>2}"
982 for item in processed_results[0]:
983 curr_k = item['summ'][:4]
984 if prev_k is not None:
985 if prev_k != curr_k:
986 tab.add_row(sep)
987
988 prev_k = curr_k
989
990 key = (item['name'], item['summ'])
991 line = list(key)
992 base = key2results[0][key]
993
994 line.append(iops_frmt.format(base))
995
996 for test_results in key2results[1:]:
997 val = test_results.get(key)
998 if val is None:
999 line.append("-")
1000 elif base['iops'] == 0:
1001 line.append("Nan")
1002 else:
1003 prc_val = {'iops_dev': val['iops_dev'],
1004 'iops_conf': val['iops_conf']}
1005 prc_val['iops'] = int(100 * val['iops'] / base['iops'])
1006 line.append(iops_frmt.format(prc_val))
1007
1008 line.append(base['bw'])
1009
1010 for test_results in key2results[1:]:
1011 val = test_results.get(key)
1012 if val is None:
1013 line.append("-")
1014 elif base['bw'] == 0:
1015 line.append("Nan")
1016 else:
1017 line.append(int(100 * val['bw'] / base['bw']))
1018
1019 for test_results in key2results:
1020 val = test_results.get(key)
1021 if val is None:
1022 line.append("-")
1023 else:
1024 line.append("{0[lat_50]} - {0[lat_95]}".format(val))
1025
1026 tab.add_row(line)
1027
1028 tab.set_cols_align(allign)
koder aka kdanilovbc2c8982015-06-13 02:50:43 +03001029 return tab.draw()