blob: c5615bbff8ccd4747193923aeb8e2626948cfc20 [file] [log] [blame]
koder aka kdanilov4643fd62015-02-10 16:20:13 -08001import abc
koder aka kdanilov66839a92015-04-11 13:22:31 +03002import time
koder aka kdanilov4643fd62015-02-10 16:20:13 -08003import os.path
koder aka kdanilove21d7472015-02-14 19:02:04 -08004import logging
5
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +03006from wally.ssh_utils import copy_paths, run_over_ssh, delete_file
7from wally.utils import ssize_to_b, open_for_append_or_create, sec_to_str
8
9from . import postgres
10from .io import agent as io_agent
11from .io import formatter as io_formatter
12from .io.results_loader import parse_output
koder aka kdanilov652cd802015-04-13 12:21:07 +030013
koder aka kdanilov4643fd62015-02-10 16:20:13 -080014
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030015logger = logging.getLogger("wally")
koder aka kdanilove21d7472015-02-14 19:02:04 -080016
17
koder aka kdanilov4643fd62015-02-10 16:20:13 -080018class IPerfTest(object):
koder aka kdanilov4500a5f2015-04-17 16:55:17 +030019 def __init__(self, on_result_cb, log_directory=None, node=None):
koder aka kdanilov4643fd62015-02-10 16:20:13 -080020 self.on_result_cb = on_result_cb
koder aka kdanilov4500a5f2015-04-17 16:55:17 +030021 self.log_directory = log_directory
22 self.node = node
koder aka kdanilov4643fd62015-02-10 16:20:13 -080023
koder aka kdanilov4643fd62015-02-10 16:20:13 -080024 def pre_run(self, conn):
25 pass
26
koder aka kdanilov4500a5f2015-04-17 16:55:17 +030027 def cleanup(self, conn):
28 pass
29
koder aka kdanilov4643fd62015-02-10 16:20:13 -080030 @abc.abstractmethod
koder aka kdanilov2c473092015-03-29 17:12:13 +030031 def run(self, conn, barrier):
koder aka kdanilov4643fd62015-02-10 16:20:13 -080032 pass
33
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030034 @classmethod
35 def format_for_console(cls, data):
36 msg = "{0}.format_for_console".format(cls.__name__)
37 raise NotImplementedError(msg)
38
koder aka kdanilov4643fd62015-02-10 16:20:13 -080039
Yulia Portnova7ddfa732015-02-24 17:32:58 +020040class TwoScriptTest(IPerfTest):
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030041 remote_tmp_dir = '/tmp'
42
koder aka kdanilov4500a5f2015-04-17 16:55:17 +030043 def __init__(self, opts, on_result_cb, log_directory=None, node=None):
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030044 IPerfTest.__init__(self, on_result_cb, log_directory, node=node)
Yulia Portnova7ddfa732015-02-24 17:32:58 +020045 self.opts = opts
Yulia Portnova7ddfa732015-02-24 17:32:58 +020046
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030047 if 'run_script' in self.opts:
48 self.run_script = self.opts['run_script']
49 self.prepare_script = self.opts['prepare_script']
Yulia Portnova7ddfa732015-02-24 17:32:58 +020050
51 def get_remote_for_script(self, script):
52 return os.path.join(self.tmp_dir, script.rpartition('/')[2])
53
54 def copy_script(self, conn, src):
55 remote_path = self.get_remote_for_script(src)
56 copy_paths(conn, {src: remote_path})
57 return remote_path
58
59 def pre_run(self, conn):
60 remote_script = self.copy_script(conn, self.pre_run_script)
61 cmd = remote_script
koder aka kdanilov4500a5f2015-04-17 16:55:17 +030062 run_over_ssh(conn, cmd, node=self.node)
Yulia Portnova7ddfa732015-02-24 17:32:58 +020063
koder aka kdanilov2c473092015-03-29 17:12:13 +030064 def run(self, conn, barrier):
Yulia Portnova7ddfa732015-02-24 17:32:58 +020065 remote_script = self.copy_script(conn, self.run_script)
Yulia Portnova886a2562015-04-07 11:16:13 +030066 cmd_opts = ' '.join(["%s %s" % (key, val) for key, val
67 in self.opts.items()])
68 cmd = remote_script + ' ' + cmd_opts
koder aka kdanilov4500a5f2015-04-17 16:55:17 +030069 out_err = run_over_ssh(conn, cmd, node=self.node)
koder aka kdanilov66839a92015-04-11 13:22:31 +030070 self.on_result(out_err, cmd)
Yulia Portnova7ddfa732015-02-24 17:32:58 +020071
72 def parse_results(self, out):
73 for line in out.split("\n"):
74 key, separator, value = line.partition(":")
75 if key and value:
76 self.on_result_cb((key, float(value)))
77
koder aka kdanilov66839a92015-04-11 13:22:31 +030078 def on_result(self, out_err, cmd):
79 try:
80 self.parse_results(out_err)
81 except Exception as exc:
82 msg_templ = "Error during postprocessing results: {0!r}. {1}"
83 raise RuntimeError(msg_templ.format(exc.message, out_err))
Yulia Portnova7ddfa732015-02-24 17:32:58 +020084
85
86class PgBenchTest(TwoScriptTest):
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030087 root = os.path.dirname(postgres.__file__)
88 prepare_script = os.path.join(root, "prepare.sh")
89 run_script = os.path.join(root, "run.sh")
koder aka kdanilov4500a5f2015-04-17 16:55:17 +030090
91
koder aka kdanilov4643fd62015-02-10 16:20:13 -080092class IOPerfTest(IPerfTest):
koder aka kdanilovda45e882015-04-06 02:24:42 +030093 io_py_remote = "/tmp/disk_test_agent.py"
koder aka kdanilov2c473092015-03-29 17:12:13 +030094
koder aka kdanilov4500a5f2015-04-17 16:55:17 +030095 def __init__(self, test_options, on_result_cb,
96 log_directory=None, node=None):
97 IPerfTest.__init__(self, on_result_cb, log_directory, node=node)
koder aka kdanilov2c473092015-03-29 17:12:13 +030098 self.options = test_options
koder aka kdanilovda45e882015-04-06 02:24:42 +030099 self.config_fname = test_options['cfg']
koder aka kdanilov4500a5f2015-04-17 16:55:17 +0300100 self.alive_check_interval = test_options.get('alive_check_interval')
koder aka kdanilovda45e882015-04-06 02:24:42 +0300101 self.config_params = test_options.get('params', {})
102 self.tool = test_options.get('tool', 'fio')
103 self.raw_cfg = open(self.config_fname).read()
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +0300104 self.configs = list(io_agent.parse_fio_config_full(self.raw_cfg,
105 self.config_params))
koder aka kdanilov4643fd62015-02-10 16:20:13 -0800106
koder aka kdanilov4500a5f2015-04-17 16:55:17 +0300107 cmd_log = os.path.join(self.log_directory, "task_compiled.cfg")
108 raw_res = os.path.join(self.log_directory, "raw_results.txt")
koder aka kdanilovda45e882015-04-06 02:24:42 +0300109
koder aka kdanilov4500a5f2015-04-17 16:55:17 +0300110 fio_command_file = open_for_append_or_create(cmd_log)
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +0300111 fio_command_file.write(io_agent.compile(self.raw_cfg,
112 self.config_params,
113 None))
koder aka kdanilov4500a5f2015-04-17 16:55:17 +0300114 self.fio_raw_results_file = open_for_append_or_create(raw_res)
115
116 def cleanup(self, conn):
117 delete_file(conn, self.io_py_remote)
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +0300118 # Need to remove tempo files, used for testing
koder aka kdanilov4500a5f2015-04-17 16:55:17 +0300119
120 def pre_run(self, conn):
koder aka kdanilov652cd802015-04-13 12:21:07 +0300121 try:
koder aka kdanilov4500a5f2015-04-17 16:55:17 +0300122 run_over_ssh(conn, 'which fio', node=self.node)
koder aka kdanilov652cd802015-04-13 12:21:07 +0300123 except OSError:
124 # TODO: install fio, if not installed
125 cmd = "sudo apt-get -y install fio"
koder aka kdanilov66839a92015-04-11 13:22:31 +0300126
koder aka kdanilov652cd802015-04-13 12:21:07 +0300127 for i in range(3):
128 try:
koder aka kdanilov4500a5f2015-04-17 16:55:17 +0300129 run_over_ssh(conn, cmd, node=self.node)
koder aka kdanilov652cd802015-04-13 12:21:07 +0300130 break
131 except OSError as err:
132 time.sleep(3)
133 else:
134 raise OSError("Can't install fio - " + err.message)
koder aka kdanilovda45e882015-04-06 02:24:42 +0300135
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +0300136 local_fname = io_agent.__file__.rsplit('.')[0] + ".py"
koder aka kdanilov2c473092015-03-29 17:12:13 +0300137 self.files_to_copy = {local_fname: self.io_py_remote}
koder aka kdanilov50f18642015-02-11 08:54:44 -0800138 copy_paths(conn, self.files_to_copy)
koder aka kdanilov4643fd62015-02-10 16:20:13 -0800139
koder aka kdanilove87ae652015-04-20 02:14:35 +0300140 if self.options.get('prefill_files', True):
141 files = {}
koder aka kdanilov652cd802015-04-13 12:21:07 +0300142
koder aka kdanilove87ae652015-04-20 02:14:35 +0300143 for secname, params in self.configs:
144 sz = ssize_to_b(params['size'])
145 msz = sz / (1024 ** 2)
146 if sz % (1024 ** 2) != 0:
147 msz += 1
koder aka kdanilov6e2ae792015-03-04 18:02:24 -0800148
koder aka kdanilove87ae652015-04-20 02:14:35 +0300149 fname = params['filename']
koder aka kdanilov652cd802015-04-13 12:21:07 +0300150
koder aka kdanilove87ae652015-04-20 02:14:35 +0300151 # if already has other test with the same file name
152 # take largest size
153 files[fname] = max(files.get(fname, 0), msz)
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +0300154
koder aka kdanilove87ae652015-04-20 02:14:35 +0300155 # logger.warning("dd run DISABLED")
156 # cmd_templ = "dd if=/dev/zero of={0} bs={1} count={2}"
157
158 cmd_templ = "sudo dd if=/dev/zero of={0} bs={1} count={2}"
159 ssize = 0
160 stime = time.time()
161
162 for fname, curr_sz in files.items():
163 cmd = cmd_templ.format(fname, 1024 ** 2, curr_sz)
164 ssize += curr_sz
165 run_over_ssh(conn, cmd, timeout=curr_sz, node=self.node)
166
167 ddtime = time.time() - stime
168 if ddtime > 1E-3:
169 fill_bw = int(ssize / ddtime)
170 mess = "Initiall dd fill bw is {0} MiBps for this vm"
171 logger.info(mess.format(fill_bw))
172 else:
173 logger.warning("Test files prefill disabled")
koder aka kdanilov6e2ae792015-03-04 18:02:24 -0800174
koder aka kdanilov2c473092015-03-29 17:12:13 +0300175 def run(self, conn, barrier):
koder aka kdanilov168f6092015-04-19 02:33:38 +0300176 # logger.warning("No tests runned")
177 # return
koder aka kdanilove87ae652015-04-20 02:14:35 +0300178 cmd_templ = "sudo env python2 {0} {3} --type {1} {2} --json -"
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +0300179 # cmd_templ = "env python2 {0} --type {1} {2} --json -"
koder aka kdanilov66839a92015-04-11 13:22:31 +0300180
181 params = " ".join("{0}={1}".format(k, v)
182 for k, v in self.config_params.items())
183
184 if "" != params:
185 params = "--params " + params
186
koder aka kdanilove87ae652015-04-20 02:14:35 +0300187 if self.options.get('cluster', False):
188 logger.info("Cluster mode is used")
189 cluster_opt = "--cluster"
190 else:
191 logger.info("Non-cluster mode is used")
192 cluster_opt = ""
193
194 cmd = cmd_templ.format(self.io_py_remote, self.tool, params,
195 cluster_opt)
koder aka kdanilov66839a92015-04-11 13:22:31 +0300196 logger.debug("Waiting on barrier")
koder aka kdanilov652cd802015-04-13 12:21:07 +0300197
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +0300198 exec_time = io_agent.estimate_cfg(self.raw_cfg, self.config_params)
koder aka kdanilov652cd802015-04-13 12:21:07 +0300199 exec_time_str = sec_to_str(exec_time)
200
koder aka kdanilov2c473092015-03-29 17:12:13 +0300201 try:
koder aka kdanilove87ae652015-04-20 02:14:35 +0300202 timeout = int(exec_time * 1.2 + 300)
koder aka kdanilov652cd802015-04-13 12:21:07 +0300203 if barrier.wait():
koder aka kdanilov4500a5f2015-04-17 16:55:17 +0300204 templ = "Test should takes about {0}. Will wait at most {1}"
koder aka kdanilov4500a5f2015-04-17 16:55:17 +0300205 logger.info(templ.format(exec_time_str, sec_to_str(timeout)))
koder aka kdanilov652cd802015-04-13 12:21:07 +0300206
207 out_err = run_over_ssh(conn, cmd,
208 stdin_data=self.raw_cfg,
koder aka kdanilov4500a5f2015-04-17 16:55:17 +0300209 timeout=timeout,
210 node=self.node)
koder aka kdanilov12ae0632015-04-15 01:13:43 +0300211 logger.info("Done")
koder aka kdanilov2c473092015-03-29 17:12:13 +0300212 finally:
213 barrier.exit()
214
koder aka kdanilov652cd802015-04-13 12:21:07 +0300215 self.on_result(out_err, cmd)
216
koder aka kdanilov66839a92015-04-11 13:22:31 +0300217 def on_result(self, out_err, cmd):
218 try:
219 for data in parse_output(out_err):
220 self.on_result_cb(data)
221 except Exception as exc:
222 msg_templ = "Error during postprocessing results: {0!r}"
223 raise RuntimeError(msg_templ.format(exc.message))
224
225 def merge_results(self, results):
koder aka kdanilov4500a5f2015-04-17 16:55:17 +0300226 if len(results) == 0:
227 return None
228
koder aka kdanilov66839a92015-04-11 13:22:31 +0300229 merged_result = results[0]
230 merged_data = merged_result['res']
231 expected_keys = set(merged_data.keys())
koder aka kdanilov4e9f3ed2015-04-14 11:26:12 +0300232 mergable_fields = ['bw', 'clat', 'iops', 'lat', 'slat']
koder aka kdanilov66839a92015-04-11 13:22:31 +0300233
234 for res in results[1:]:
235 assert res['__meta__'] == merged_result['__meta__']
236
237 data = res['res']
238 diff = set(data.keys()).symmetric_difference(expected_keys)
239
240 msg = "Difference: {0}".format(",".join(diff))
241 assert len(diff) == 0, msg
242
243 for testname, test_data in data.items():
244 res_test_data = merged_data[testname]
245
246 diff = set(test_data.keys()).symmetric_difference(
247 res_test_data.keys())
248
249 msg = "Difference: {0}".format(",".join(diff))
250 assert len(diff) == 0, msg
251
252 for k, v in test_data.items():
253 if k in mergable_fields:
254 res_test_data[k].extend(v)
255 else:
256 msg = "{0!r} != {1!r}".format(res_test_data[k], v)
257 assert res_test_data[k] == v, msg
258
259 return merged_result
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +0300260
261 @classmethod
262 def format_for_console(cls, data):
263 return io_formatter.format_results_for_console(data)