blob: afb6ac1d9f705d6bed29454ff817f3d0f36c31be [file] [log] [blame]
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +03001import time
2import json
3import os.path
4import logging
5import datetime
6
7from wally.utils import (ssize2b, open_for_append_or_create,
8 sec_to_str, StopTestError)
9
10from wally.ssh_utils import save_to_remote, read_from_remote, BGSSHTask
11
12from ..itest import IPerfTest, TestResults
13from .formatter import format_results_for_console
14from .fio_task_parser import (execution_time, fio_cfg_compile,
15 get_test_summary, FioJobSection)
16
17
18logger = logging.getLogger("wally")
19
20
21class IOTestResults(TestResults):
22 def summary(self):
23 return get_test_summary(self.config) + "vm" + str(self.vm_count)
24
25 def get_yamable(self):
26 return {
27 'type': "fio_test",
28 'params': self.params,
29 'config': (self.config.name, self.config.vals),
30 'results': self.results,
31 'raw_result': self.raw_result,
32 'run_interval': self.run_interval,
33 'vm_count': self.vm_count
34 }
35
36 @classmethod
37 def from_yaml(cls, data):
38 name, vals = data['config']
39 sec = FioJobSection(name)
40 sec.vals = vals
41
42 return cls(sec, data['params'], data['results'],
43 data['raw_result'], data['run_interval'],
44 data['vm_count'])
45
46
47def get_slice_parts_offset(test_slice, real_inteval):
48 calc_exec_time = sum(map(execution_time, test_slice))
49 coef = (real_inteval[1] - real_inteval[0]) / calc_exec_time
50 curr_offset = real_inteval[0]
51 for section in test_slice:
52 slen = execution_time(section) * coef
53 yield (curr_offset, curr_offset + slen)
54 curr_offset += slen
55
56
57class IOPerfTest(IPerfTest):
58 tcp_conn_timeout = 30
59 max_pig_timeout = 5
60 soft_runcycle = 5 * 60
61
62 def __init__(self, *dt, **mp):
63 IPerfTest.__init__(self, *dt, **mp)
64 self.config_fname = self.options['cfg']
65
66 if '/' not in self.config_fname and '.' not in self.config_fname:
67 cfgs_dir = os.path.dirname(__file__)
68 self.config_fname = os.path.join(cfgs_dir,
69 self.config_fname + '.cfg')
70
71 self.alive_check_interval = self.options.get('alive_check_interval')
72
73 self.config_params = self.options.get('params', {}).copy()
74 self.tool = self.options.get('tool', 'fio')
75
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030076 self.io_py_remote = self.join_remote("agent.py")
77 self.results_file = self.join_remote("results.json")
78 self.pid_file = self.join_remote("pid")
79 self.task_file = self.join_remote("task.cfg")
80 self.use_sudo = self.options.get("use_sudo", True)
81 self.test_logging = self.options.get("test_logging", False)
82 self.raw_cfg = open(self.config_fname).read()
83 self.fio_configs = fio_cfg_compile(self.raw_cfg,
84 self.config_fname,
85 self.config_params,
86 split_on_names=self.test_logging)
87 self.fio_configs = list(self.fio_configs)
88
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030089 def __str__(self):
90 return "{0}({1})".format(self.__class__.__name__,
91 self.node.get_conn_id())
92
93 @classmethod
94 def load(cls, data):
95 return IOTestResults.from_yaml(data)
96
97 def cleanup(self):
98 # delete_file(conn, self.io_py_remote)
99 # Need to remove tempo files, used for testing
100 pass
101
102 def prefill_test_files(self):
103 files = {}
104 for cfg_slice in self.fio_configs:
105 for section in cfg_slice:
106 sz = ssize2b(section.vals['size'])
107 msz = sz / (1024 ** 2)
108
109 if sz % (1024 ** 2) != 0:
110 msz += 1
111
112 fname = section.vals['filename']
113
114 # if already has other test with the same file name
115 # take largest size
116 files[fname] = max(files.get(fname, 0), msz)
117
118 cmd_templ = "dd oflag=direct " + \
119 "if=/dev/zero of={0} bs={1} count={2}"
120
121 if self.use_sudo:
122 cmd_templ = "sudo " + cmd_templ
123
124 ssize = 0
125 stime = time.time()
126
127 for fname, curr_sz in files.items():
128 cmd = cmd_templ.format(fname, 1024 ** 2, curr_sz)
129 ssize += curr_sz
130 self.run_over_ssh(cmd, timeout=curr_sz)
131
132 ddtime = time.time() - stime
133 if ddtime > 1E-3:
134 fill_bw = int(ssize / ddtime)
135 mess = "Initiall dd fill bw is {0} MiBps for this vm"
136 logger.info(mess.format(fill_bw))
137 self.coordinate(('init_bw', fill_bw))
138
139 def install_utils(self, max_retry=3, timeout=5):
140 need_install = []
141 for bin_name, package in (('fio', 'fio'), ('screen', 'screen')):
142 try:
143 self.run_over_ssh('which ' + bin_name, nolog=True)
144 except OSError:
145 need_install.append(package)
146
147 if len(need_install) == 0:
148 return
149
150 cmd = "sudo apt-get -y install " + " ".join(need_install)
151
152 for i in range(max_retry):
153 try:
154 self.run_over_ssh(cmd)
155 break
156 except OSError as err:
157 time.sleep(timeout)
158 else:
159 raise OSError("Can't install - " + str(err))
160
161 def pre_run(self):
162 try:
163 cmd = 'mkdir -p "{0}"'.format(self.remote_dir)
164 if self.use_sudo:
165 cmd = "sudo " + cmd
166 cmd += " ; sudo chown {0} {1}".format(self.node.get_user(),
167 self.remote_dir)
168
169 self.run_over_ssh(cmd)
170 except Exception as exc:
171 msg = "Failed to create folder {0} on remote {1}. Error: {2!s}"
172 msg = msg.format(self.remote_dir, self.node.get_conn_id(), exc)
173 logger.exception(msg)
174 raise StopTestError(msg, exc)
175
176 self.install_utils()
177
178 if self.options.get('prefill_files', True):
179 self.prefill_test_files()
180 elif self.is_primary:
181 logger.warning("Prefilling of test files is disabled")
182
183 def run(self, barrier):
184 try:
185 if len(self.fio_configs) > 1 and self.is_primary:
186
187 exec_time = 0
188 for test_slice in self.fio_configs:
189 exec_time += sum(map(execution_time, test_slice))
190
191 # +10% - is a rough estimation for additional operations
192 # like sftp, etc
193 exec_time = int(exec_time * 1.1)
194
195 exec_time_s = sec_to_str(exec_time)
196 now_dt = datetime.datetime.now()
197 end_dt = now_dt + datetime.timedelta(0, exec_time)
198 msg = "Entire test should takes aroud: {0} and finished at {1}"
199 logger.info(msg.format(exec_time_s,
200 end_dt.strftime("%H:%M:%S")))
201
202 for pos, fio_cfg_slice in enumerate(self.fio_configs):
203 fio_cfg_slice = list(fio_cfg_slice)
204 names = [i.name for i in fio_cfg_slice]
205 msgs = []
206 already_processed = set()
207 for name in names:
208 if name not in already_processed:
209 already_processed.add(name)
210
211 if 1 == names.count(name):
212 msgs.append(name)
213 else:
214 frmt = "{0} * {1}"
215 msgs.append(frmt.format(name,
216 names.count(name)))
217
218 if self.is_primary:
219 logger.info("Will run tests: " + ", ".join(msgs))
220
221 nolog = (pos != 0) or not self.is_primary
koder aka kdanilovfd2cfa52015-05-20 03:17:42 +0300222 out_err, interval = self.do_run(barrier, fio_cfg_slice, pos,
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300223 nolog=nolog)
224
225 try:
226 full_raw_res = json.loads(out_err)
227
228 res = {"bw": [], "iops": [], "lat": [],
229 "clat": [], "slat": []}
230
231 for raw_result in full_raw_res['jobs']:
232 load_data = raw_result['mixed']
233
234 res["bw"].append(load_data["bw"])
235 res["iops"].append(load_data["iops"])
236 res["lat"].append(load_data["lat"]["mean"])
237 res["clat"].append(load_data["clat"]["mean"])
238 res["slat"].append(load_data["slat"]["mean"])
239
240 first = fio_cfg_slice[0]
241 p1 = first.vals.copy()
242 p1.pop('ramp_time', 0)
243
244 for nxt in fio_cfg_slice[1:]:
245 assert nxt.name == first.name
246 p2 = nxt.vals
247 p2.pop('_ramp_time', 0)
248
249 assert p1 == p2
250
251 tres = IOTestResults(first,
252 self.config_params, res,
253 full_raw_res, interval,
254 vm_count=self.total_nodes_count)
255 self.on_result_cb(tres)
256 except (OSError, StopTestError):
257 raise
258 except Exception as exc:
259 msg_templ = "Error during postprocessing results: {0!s}"
260 raise RuntimeError(msg_templ.format(exc))
261
262 finally:
263 barrier.exit()
264
koder aka kdanilovfd2cfa52015-05-20 03:17:42 +0300265 def do_run(self, barrier, cfg_slice, pos, nolog=False):
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300266 # return open("/tmp/lit-sunshine/io/results.json").read(), (1, 2)
267 conn_id = self.node.get_conn_id()
koder aka kdanilovfd2cfa52015-05-20 03:17:42 +0300268 fconn_id = conn_id.replace(":", "_")
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300269
270 cmd_templ = "fio --output-format=json --output={1} " + \
271 "--alloc-size=262144 {0}"
272
273 if self.options.get("use_sudo", True):
274 cmd_templ = "sudo " + cmd_templ
275
276 task_fc = "\n\n".join(map(str, cfg_slice))
277 with self.node.connection.open_sftp() as sftp:
278 save_to_remote(sftp, self.task_file, task_fc)
279
koder aka kdanilovfd2cfa52015-05-20 03:17:42 +0300280 fname = "{0}_{1}.fio".format(pos, fconn_id)
281 with open(os.path.join(self.log_directory, fname), "w") as fd:
282 fd.write(task_fc)
283
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300284 cmd = cmd_templ.format(self.task_file, self.results_file)
285
286 exec_time = sum(map(execution_time, cfg_slice))
287 exec_time_str = sec_to_str(exec_time)
288
289 timeout = int(exec_time + max(300, exec_time))
290 soft_tout = exec_time
291 barrier.wait()
292
293 if self.is_primary:
294 templ = "Test should takes about {0}." + \
295 " Should finish at {1}," + \
296 " will wait at most till {2}"
297 now_dt = datetime.datetime.now()
298 end_dt = now_dt + datetime.timedelta(0, exec_time)
299 wait_till = now_dt + datetime.timedelta(0, timeout)
300
301 logger.info(templ.format(exec_time_str,
302 end_dt.strftime("%H:%M:%S"),
303 wait_till.strftime("%H:%M:%S")))
304
koder aka kdanilovfd2cfa52015-05-20 03:17:42 +0300305 self.run_over_ssh("cd " + os.path.dirname(self.task_file), nolog=True)
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300306 task = BGSSHTask(self.node, self.options.get("use_sudo", True))
307 begin = time.time()
308 task.start(cmd)
309 task.wait(soft_tout, timeout)
310 end = time.time()
311
312 if not nolog:
313 logger.debug("Test on node {0} is finished".format(conn_id))
314
koder aka kdanilovfd2cfa52015-05-20 03:17:42 +0300315 log_files = set()
316 for cfg in cfg_slice:
317 if 'write_lat_log' in cfg.vals:
318 fname = cfg.vals['write_lat_log']
319 log_files.add(fname + '_clat.log')
320 log_files.add(fname + '_lat.log')
321 log_files.add(fname + '_slat.log')
322
323 if 'write_iops_log' in cfg.vals:
324 fname = cfg.vals['write_iops_log']
325 log_files.add(fname + '_iops.log')
326
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300327 with self.node.connection.open_sftp() as sftp:
koder aka kdanilovfd2cfa52015-05-20 03:17:42 +0300328 result = read_from_remote(sftp, self.results_file)
329 sftp.remove(self.results_file)
330
331 fname = "{0}_{1}.json".format(pos, fconn_id)
332 with open(os.path.join(self.log_directory, fname), "w") as fd:
333 fd.write(result)
334
335 for fname in log_files:
336 try:
337 fc = read_from_remote(sftp, fname)
338 except:
339 continue
340 sftp.remove(fname)
341
342 loc_fname = "{0}_{1}_{2}".format(pos, fconn_id,
343 fname.split('_')[-1])
344 loc_path = os.path.join(self.log_directory, loc_fname)
345 with open(loc_path, "w") as fd:
346 fd.write(fc)
347
348 return result, (begin, end)
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300349
350 @classmethod
351 def merge_results(cls, results):
352 merged = results[0]
353 for block in results[1:]:
354 assert block["__meta__"] == merged["__meta__"]
355 merged['res'].extend(block['res'])
356 return merged
357
358 @classmethod
359 def format_for_console(cls, data, dinfo):
360 return format_results_for_console(dinfo)