blob: 4828850988a7ee5684d23eb1cb53226fafaefc55 [file] [log] [blame]
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +03001import time
2import json
3import os.path
4import logging
5import datetime
6
7from wally.utils import (ssize2b, open_for_append_or_create,
8 sec_to_str, StopTestError)
9
10from wally.ssh_utils import save_to_remote, read_from_remote, BGSSHTask
11
12from ..itest import IPerfTest, TestResults
13from .formatter import format_results_for_console
14from .fio_task_parser import (execution_time, fio_cfg_compile,
15 get_test_summary, FioJobSection)
16
17
18logger = logging.getLogger("wally")
19
20
21class IOTestResults(TestResults):
22 def summary(self):
23 return get_test_summary(self.config) + "vm" + str(self.vm_count)
24
25 def get_yamable(self):
26 return {
27 'type': "fio_test",
28 'params': self.params,
29 'config': (self.config.name, self.config.vals),
30 'results': self.results,
31 'raw_result': self.raw_result,
32 'run_interval': self.run_interval,
33 'vm_count': self.vm_count
34 }
35
36 @classmethod
37 def from_yaml(cls, data):
38 name, vals = data['config']
39 sec = FioJobSection(name)
40 sec.vals = vals
41
42 return cls(sec, data['params'], data['results'],
43 data['raw_result'], data['run_interval'],
44 data['vm_count'])
45
46
47def get_slice_parts_offset(test_slice, real_inteval):
48 calc_exec_time = sum(map(execution_time, test_slice))
49 coef = (real_inteval[1] - real_inteval[0]) / calc_exec_time
50 curr_offset = real_inteval[0]
51 for section in test_slice:
52 slen = execution_time(section) * coef
53 yield (curr_offset, curr_offset + slen)
54 curr_offset += slen
55
56
57class IOPerfTest(IPerfTest):
58 tcp_conn_timeout = 30
59 max_pig_timeout = 5
60 soft_runcycle = 5 * 60
61
62 def __init__(self, *dt, **mp):
63 IPerfTest.__init__(self, *dt, **mp)
64 self.config_fname = self.options['cfg']
65
66 if '/' not in self.config_fname and '.' not in self.config_fname:
67 cfgs_dir = os.path.dirname(__file__)
68 self.config_fname = os.path.join(cfgs_dir,
69 self.config_fname + '.cfg')
70
71 self.alive_check_interval = self.options.get('alive_check_interval')
72
73 self.config_params = self.options.get('params', {}).copy()
74 self.tool = self.options.get('tool', 'fio')
75
76 raw_res = os.path.join(self.log_directory, "raw_results.txt")
77 self.fio_raw_results_file = open_for_append_or_create(raw_res)
78
79 self.io_py_remote = self.join_remote("agent.py")
80 self.results_file = self.join_remote("results.json")
81 self.pid_file = self.join_remote("pid")
82 self.task_file = self.join_remote("task.cfg")
83 self.use_sudo = self.options.get("use_sudo", True)
84 self.test_logging = self.options.get("test_logging", False)
85 self.raw_cfg = open(self.config_fname).read()
86 self.fio_configs = fio_cfg_compile(self.raw_cfg,
87 self.config_fname,
88 self.config_params,
89 split_on_names=self.test_logging)
90 self.fio_configs = list(self.fio_configs)
91
92 cmd_log = os.path.join(self.log_directory, "task_compiled.cfg")
93 fio_command_file = open_for_append_or_create(cmd_log)
94 splitter = "\n\n" + "-" * 60 + "\n\n"
95 fio_command_file.write(splitter.join(map(str, self.fio_configs)))
96
97 def __str__(self):
98 return "{0}({1})".format(self.__class__.__name__,
99 self.node.get_conn_id())
100
101 @classmethod
102 def load(cls, data):
103 return IOTestResults.from_yaml(data)
104
105 def cleanup(self):
106 # delete_file(conn, self.io_py_remote)
107 # Need to remove tempo files, used for testing
108 pass
109
110 def prefill_test_files(self):
111 files = {}
112 for cfg_slice in self.fio_configs:
113 for section in cfg_slice:
114 sz = ssize2b(section.vals['size'])
115 msz = sz / (1024 ** 2)
116
117 if sz % (1024 ** 2) != 0:
118 msz += 1
119
120 fname = section.vals['filename']
121
122 # if already has other test with the same file name
123 # take largest size
124 files[fname] = max(files.get(fname, 0), msz)
125
126 cmd_templ = "dd oflag=direct " + \
127 "if=/dev/zero of={0} bs={1} count={2}"
128
129 if self.use_sudo:
130 cmd_templ = "sudo " + cmd_templ
131
132 ssize = 0
133 stime = time.time()
134
135 for fname, curr_sz in files.items():
136 cmd = cmd_templ.format(fname, 1024 ** 2, curr_sz)
137 ssize += curr_sz
138 self.run_over_ssh(cmd, timeout=curr_sz)
139
140 ddtime = time.time() - stime
141 if ddtime > 1E-3:
142 fill_bw = int(ssize / ddtime)
143 mess = "Initiall dd fill bw is {0} MiBps for this vm"
144 logger.info(mess.format(fill_bw))
145 self.coordinate(('init_bw', fill_bw))
146
147 def install_utils(self, max_retry=3, timeout=5):
148 need_install = []
149 for bin_name, package in (('fio', 'fio'), ('screen', 'screen')):
150 try:
151 self.run_over_ssh('which ' + bin_name, nolog=True)
152 except OSError:
153 need_install.append(package)
154
155 if len(need_install) == 0:
156 return
157
158 cmd = "sudo apt-get -y install " + " ".join(need_install)
159
160 for i in range(max_retry):
161 try:
162 self.run_over_ssh(cmd)
163 break
164 except OSError as err:
165 time.sleep(timeout)
166 else:
167 raise OSError("Can't install - " + str(err))
168
169 def pre_run(self):
170 try:
171 cmd = 'mkdir -p "{0}"'.format(self.remote_dir)
172 if self.use_sudo:
173 cmd = "sudo " + cmd
174 cmd += " ; sudo chown {0} {1}".format(self.node.get_user(),
175 self.remote_dir)
176
177 self.run_over_ssh(cmd)
178 except Exception as exc:
179 msg = "Failed to create folder {0} on remote {1}. Error: {2!s}"
180 msg = msg.format(self.remote_dir, self.node.get_conn_id(), exc)
181 logger.exception(msg)
182 raise StopTestError(msg, exc)
183
184 self.install_utils()
185
186 if self.options.get('prefill_files', True):
187 self.prefill_test_files()
188 elif self.is_primary:
189 logger.warning("Prefilling of test files is disabled")
190
191 def run(self, barrier):
192 try:
193 if len(self.fio_configs) > 1 and self.is_primary:
194
195 exec_time = 0
196 for test_slice in self.fio_configs:
197 exec_time += sum(map(execution_time, test_slice))
198
199 # +10% - is a rough estimation for additional operations
200 # like sftp, etc
201 exec_time = int(exec_time * 1.1)
202
203 exec_time_s = sec_to_str(exec_time)
204 now_dt = datetime.datetime.now()
205 end_dt = now_dt + datetime.timedelta(0, exec_time)
206 msg = "Entire test should takes aroud: {0} and finished at {1}"
207 logger.info(msg.format(exec_time_s,
208 end_dt.strftime("%H:%M:%S")))
209
210 for pos, fio_cfg_slice in enumerate(self.fio_configs):
211 fio_cfg_slice = list(fio_cfg_slice)
212 names = [i.name for i in fio_cfg_slice]
213 msgs = []
214 already_processed = set()
215 for name in names:
216 if name not in already_processed:
217 already_processed.add(name)
218
219 if 1 == names.count(name):
220 msgs.append(name)
221 else:
222 frmt = "{0} * {1}"
223 msgs.append(frmt.format(name,
224 names.count(name)))
225
226 if self.is_primary:
227 logger.info("Will run tests: " + ", ".join(msgs))
228
229 nolog = (pos != 0) or not self.is_primary
230 out_err, interval = self.do_run(barrier, fio_cfg_slice,
231 nolog=nolog)
232
233 try:
234 full_raw_res = json.loads(out_err)
235
236 res = {"bw": [], "iops": [], "lat": [],
237 "clat": [], "slat": []}
238
239 for raw_result in full_raw_res['jobs']:
240 load_data = raw_result['mixed']
241
242 res["bw"].append(load_data["bw"])
243 res["iops"].append(load_data["iops"])
244 res["lat"].append(load_data["lat"]["mean"])
245 res["clat"].append(load_data["clat"]["mean"])
246 res["slat"].append(load_data["slat"]["mean"])
247
248 first = fio_cfg_slice[0]
249 p1 = first.vals.copy()
250 p1.pop('ramp_time', 0)
251
252 for nxt in fio_cfg_slice[1:]:
253 assert nxt.name == first.name
254 p2 = nxt.vals
255 p2.pop('_ramp_time', 0)
256
257 assert p1 == p2
258
259 tres = IOTestResults(first,
260 self.config_params, res,
261 full_raw_res, interval,
262 vm_count=self.total_nodes_count)
263 self.on_result_cb(tres)
264 except (OSError, StopTestError):
265 raise
266 except Exception as exc:
267 msg_templ = "Error during postprocessing results: {0!s}"
268 raise RuntimeError(msg_templ.format(exc))
269
270 finally:
271 barrier.exit()
272
273 def do_run(self, barrier, cfg_slice, nolog=False):
274 # return open("/tmp/lit-sunshine/io/results.json").read(), (1, 2)
275 conn_id = self.node.get_conn_id()
276
277 cmd_templ = "fio --output-format=json --output={1} " + \
278 "--alloc-size=262144 {0}"
279
280 if self.options.get("use_sudo", True):
281 cmd_templ = "sudo " + cmd_templ
282
283 task_fc = "\n\n".join(map(str, cfg_slice))
284 with self.node.connection.open_sftp() as sftp:
285 save_to_remote(sftp, self.task_file, task_fc)
286
287 cmd = cmd_templ.format(self.task_file, self.results_file)
288
289 exec_time = sum(map(execution_time, cfg_slice))
290 exec_time_str = sec_to_str(exec_time)
291
292 timeout = int(exec_time + max(300, exec_time))
293 soft_tout = exec_time
294 barrier.wait()
295
296 if self.is_primary:
297 templ = "Test should takes about {0}." + \
298 " Should finish at {1}," + \
299 " will wait at most till {2}"
300 now_dt = datetime.datetime.now()
301 end_dt = now_dt + datetime.timedelta(0, exec_time)
302 wait_till = now_dt + datetime.timedelta(0, timeout)
303
304 logger.info(templ.format(exec_time_str,
305 end_dt.strftime("%H:%M:%S"),
306 wait_till.strftime("%H:%M:%S")))
307
308 task = BGSSHTask(self.node, self.options.get("use_sudo", True))
309 begin = time.time()
310 task.start(cmd)
311 task.wait(soft_tout, timeout)
312 end = time.time()
313
314 if not nolog:
315 logger.debug("Test on node {0} is finished".format(conn_id))
316
317 with self.node.connection.open_sftp() as sftp:
318 return read_from_remote(sftp, self.results_file), (begin, end)
319
320 @classmethod
321 def merge_results(cls, results):
322 merged = results[0]
323 for block in results[1:]:
324 assert block["__meta__"] == merged["__meta__"]
325 merged['res'].extend(block['res'])
326 return merged
327
328 @classmethod
329 def format_for_console(cls, data, dinfo):
330 return format_results_for_console(dinfo)