blob: 605df2c9212804b9259077ed69825d5f58cec705 [file] [log] [blame]
import abc
import time
import socket
import random
import os.path
import logging
import datetime
from paramiko import SSHException, SFTPError
from wally.utils import (ssize_to_b, open_for_append_or_create,
sec_to_str, StopTestError)
from wally.ssh_utils import (copy_paths, run_over_ssh,
save_to_remote,
# delete_file,
connect, read_from_remote, Local)
from . import postgres
from .io import agent as io_agent
from .io import formatter as io_formatter
from .io.results_loader import parse_output
logger = logging.getLogger("wally")
class IPerfTest(object):
def __init__(self, options, is_primary, on_result_cb, test_uuid, node,
log_directory=None,
coordination_queue=None,
remote_dir="/tmp/wally"):
self.options = options
self.on_result_cb = on_result_cb
self.log_directory = log_directory
self.node = node
self.test_uuid = test_uuid
self.coordination_queue = coordination_queue
self.remote_dir = remote_dir
self.is_primary = is_primary
self.stop_requested = False
def request_stop(self):
self.stop_requested = True
def join_remote(self, path):
return os.path.join(self.remote_dir, path)
def coordinate(self, data):
if self.coordination_queue is not None:
self.coordination_queue.put((self.node.get_conn_id(), data))
def pre_run(self):
pass
def cleanup(self):
pass
@abc.abstractmethod
def run(self, barrier):
pass
@classmethod
def format_for_console(cls, data):
msg = "{0}.format_for_console".format(cls.__name__)
raise NotImplementedError(msg)
def run_over_ssh(self, cmd, **kwargs):
return run_over_ssh(self.node.connection, cmd,
node=self.node.get_conn_id(), **kwargs)
@classmethod
def coordination_th(cls, coord_q, barrier, num_threads):
pass
class TwoScriptTest(IPerfTest):
def __init__(self, *dt, **mp):
IPerfTest.__init__(self, *dt, **mp)
if 'run_script' in self.options:
self.run_script = self.options['run_script']
self.prepare_script = self.options['prepare_script']
def get_remote_for_script(self, script):
return os.path.join(self.tmp_dir, script.rpartition('/')[2])
def copy_script(self, src):
remote_path = self.get_remote_for_script(src)
copy_paths(self.node.connection, {src: remote_path})
return remote_path
def pre_run(self):
remote_script = self.copy_script(self.node.connection,
self.pre_run_script)
cmd = remote_script
self.run_over_ssh(cmd)
def run(self, barrier):
remote_script = self.copy_script(self.node.connection, self.run_script)
cmd_opts = ' '.join(["%s %s" % (key, val) for key, val
in self.options.items()])
cmd = remote_script + ' ' + cmd_opts
out_err = self.run_over_ssh(cmd)
self.on_result(out_err, cmd)
def parse_results(self, out):
for line in out.split("\n"):
key, separator, value = line.partition(":")
if key and value:
self.on_result_cb((key, float(value)))
def on_result(self, out_err, cmd):
try:
self.parse_results(out_err)
except Exception as exc:
msg_templ = "Error during postprocessing results: {0!s}. {1}"
raise RuntimeError(msg_templ.format(exc, out_err))
class PgBenchTest(TwoScriptTest):
root = os.path.dirname(postgres.__file__)
prepare_script = os.path.join(root, "prepare.sh")
run_script = os.path.join(root, "run.sh")
class IOPerfTest(IPerfTest):
tcp_conn_timeout = 30
max_pig_timeout = 5
soft_runcycle = 5 * 60
def __init__(self, *dt, **mp):
IPerfTest.__init__(self, *dt, **mp)
self.config_fname = self.options['cfg']
self.alive_check_interval = self.options.get('alive_check_interval')
self.config_params = self.options.get('params', {})
self.tool = self.options.get('tool', 'fio')
self.raw_cfg = open(self.config_fname).read()
self.configs = list(io_agent.parse_all_in_1(self.raw_cfg,
self.config_params))
cmd_log = os.path.join(self.log_directory, "task_compiled.cfg")
raw_res = os.path.join(self.log_directory, "raw_results.txt")
self.io_py_remote = self.join_remote("agent.py")
self.log_fl = self.join_remote("log.txt")
self.pid_file = self.join_remote("pid")
self.task_file = self.join_remote("task.cfg")
self.use_sudo = self.options.get("use_sudo", True)
self.test_logging = self.options.get("test_logging", False)
fio_command_file = open_for_append_or_create(cmd_log)
if self.test_logging:
soft_runcycle = self.soft_runcycle
else:
soft_runcycle = None
self.fio_configs = io_agent.parse_and_slice_all_in_1(
self.raw_cfg,
self.config_params,
soft_runcycle=soft_runcycle)
self.fio_configs = list(self.fio_configs)
splitter = "\n\n" + "-" * 60 + "\n\n"
cfg = splitter.join(
map(io_agent.fio_config_to_str,
self.fio_configs))
fio_command_file.write(cfg)
self.fio_raw_results_file = open_for_append_or_create(raw_res)
def __str__(self):
return "{0}({1})".format(self.__class__.__name__,
self.node.get_conn_id())
def cleanup(self):
# delete_file(conn, self.io_py_remote)
# Need to remove tempo files, used for testing
pass
def prefill_test_files(self):
files = {}
for section in self.configs:
sz = ssize_to_b(section.vals['size'])
msz = sz / (1024 ** 2)
if sz % (1024 ** 2) != 0:
msz += 1
fname = section.vals['filename']
# if already has other test with the same file name
# take largest size
files[fname] = max(files.get(fname, 0), msz)
cmd_templ = "dd oflag=direct " + \
"if=/dev/zero of={0} bs={1} count={2}"
if self.use_sudo:
cmd_templ = "sudo " + cmd_templ
ssize = 0
stime = time.time()
for fname, curr_sz in files.items():
cmd = cmd_templ.format(fname, 1024 ** 2, curr_sz)
ssize += curr_sz
self.run_over_ssh(cmd, timeout=curr_sz)
ddtime = time.time() - stime
if ddtime > 1E-3:
fill_bw = int(ssize / ddtime)
mess = "Initiall dd fill bw is {0} MiBps for this vm"
logger.info(mess.format(fill_bw))
self.coordinate(('init_bw', fill_bw))
def install_utils(self, max_retry=3, timeout=5):
need_install = []
for bin_name, package in (('fio', 'fio'), ('screen', 'screen')):
try:
self.run_over_ssh('which ' + bin_name, nolog=True)
except OSError:
need_install.append(package)
if len(need_install) == 0:
return
cmd = "sudo apt-get -y install " + " ".join(need_install)
for i in range(max_retry):
try:
self.run_over_ssh(cmd)
break
except OSError as err:
time.sleep(timeout)
else:
raise OSError("Can't install - " + str(err))
def pre_run(self):
try:
cmd = 'mkdir -p "{0}"'.format(self.remote_dir)
if self.use_sudo:
cmd = "sudo " + cmd
cmd += " ; sudo chown {0} {1}".format(self.node.get_user(),
self.remote_dir)
self.run_over_ssh(cmd)
except Exception as exc:
msg = "Failed to create folder {0} on remote {1}. Error: {2!s}"
msg = msg.format(self.remote_dir, self.node.get_conn_id(), exc)
logger.error(msg)
raise StopTestError(msg, exc)
self.install_utils()
local_fname = os.path.splitext(io_agent.__file__)[0] + ".py"
files_to_copy = {local_fname: self.io_py_remote}
copy_paths(self.node.connection, files_to_copy)
if self.options.get('prefill_files', True):
self.prefill_test_files()
elif self.is_primary:
logger.warning("Prefilling of test files is disabled")
def check_process_is_running(self, sftp, pid):
try:
sftp.stat("/proc/{0}".format(pid))
return True
except (OSError, IOError, NameError):
return False
def kill_remote_process(self, conn, pid, soft=True):
try:
if soft:
cmd = "kill {0}"
else:
cmd = "kill -9 {0}"
if self.use_sudo:
cmd = "sudo " + cmd
self.run_over_ssh(cmd.format(pid))
return True
except OSError:
return False
def get_test_status(self, die_timeout=3):
is_connected = None
is_running = None
pid = None
err = None
try:
conn = connect(self.node.conn_url,
conn_timeout=self.tcp_conn_timeout)
with conn:
with conn.open_sftp() as sftp:
try:
pid = read_from_remote(sftp, self.pid_file)
is_running = True
except (NameError, IOError, OSError) as exc:
pid = None
is_running = False
if is_running:
if not self.check_process_is_running(sftp, pid):
try:
sftp.remove(self.pid_file)
except (IOError, NameError, OSError):
pass
is_running = False
is_connected = True
except (socket.error, SSHException, EOFError, SFTPError) as exc:
err = str(exc)
is_connected = False
return is_connected, is_running, pid, err
def wait_till_finished(self, soft_timeout, timeout):
conn_id = self.node.get_conn_id()
end_of_wait_time = timeout + time.time()
soft_end_of_wait_time = soft_timeout + time.time()
# time_till_check = random.randint(30, 90)
time_till_check = 5
pid = None
is_running = False
pid_get_timeout = self.max_pig_timeout + time.time()
curr_connected = True
while end_of_wait_time > time.time():
time.sleep(time_till_check)
is_connected, is_running, npid, err = self.get_test_status()
if is_connected and not is_running:
if pid is None:
if time.time() > pid_get_timeout:
msg = ("On node {0} pid file doesn't " +
"appears in time")
logger.error(msg.format(conn_id))
raise StopTestError("Start timeout")
else:
# execution finished
break
if npid is not None:
pid = npid
if is_connected and pid is not None and is_running:
if time.time() < soft_end_of_wait_time:
time.sleep(soft_end_of_wait_time - time.time())
if is_connected and not curr_connected:
msg = "Connection with {0} is restored"
logger.debug(msg.format(conn_id))
elif not is_connected and curr_connected:
msg = "Lost connection with " + conn_id + ". Error: " + err
logger.debug(msg)
curr_connected = is_connected
def run(self, barrier):
try:
if len(self.fio_configs) > 1 and self.is_primary:
exec_time = 0
for test in self.fio_configs:
exec_time += io_agent.calculate_execution_time(test)
# +5% - is a rough estimation for additional operations
# like sftp, etc
exec_time = int(exec_time * 1.05)
exec_time_s = sec_to_str(exec_time)
now_dt = datetime.datetime.now()
end_dt = now_dt + datetime.timedelta(0, exec_time)
msg = "Entire test should takes aroud: {0} and finished at {1}"
logger.info(msg.format(exec_time_s,
end_dt.strftime("%H:%M:%S")))
for pos, fio_cfg_slice in enumerate(self.fio_configs):
names = [i.name for i in fio_cfg_slice]
msgs = []
already_processed = set()
for name in names:
if name not in already_processed:
already_processed.add(name)
if 1 == names.count(name):
msgs.append(name)
else:
frmt = "{0} * {1}"
msgs.append(frmt.format(name,
names.count(name)))
if self.is_primary:
logger.info("Will run tests: " + ", ".join(msgs))
nolog = (pos != 0) or not self.is_primary
out_err = self.do_run(barrier, fio_cfg_slice, nolog=nolog)
try:
for data in parse_output(out_err):
data['__meta__']['raw_cfg'] = self.raw_cfg
self.on_result_cb(data)
except (OSError, StopTestError):
raise
except Exception as exc:
msg_templ = "Error during postprocessing results: {0!s}"
raise RuntimeError(msg_templ.format(exc))
finally:
barrier.exit()
def do_run(self, barrier, cfg, nolog=False):
conn_id = self.node.get_conn_id()
cmd_templ = "screen -S {screen_name} -d -m " + \
"env python2 {0} -p {pid_file} -o {results_file} " + \
"--type {1} {2} --json {3}"
if self.options.get("use_sudo", True):
cmd_templ = "sudo " + cmd_templ
params = " ".join("{0}={1}".format(k, v)
for k, v in self.config_params.items())
if "" != params:
params = "--params " + params
with self.node.connection.open_sftp() as sftp:
save_to_remote(sftp, self.task_file,
io_agent.fio_config_to_str(cfg))
screen_name = self.test_uuid
cmd = cmd_templ.format(self.io_py_remote,
self.tool,
params,
self.task_file,
pid_file=self.pid_file,
results_file=self.log_fl,
screen_name=screen_name)
exec_time = io_agent.calculate_execution_time(cfg)
exec_time_str = sec_to_str(exec_time)
timeout = int(exec_time + max(300, exec_time))
soft_tout = exec_time
barrier.wait()
self.run_over_ssh(cmd, nolog=nolog)
if self.is_primary:
templ = "Test should takes about {0}." + \
" Should finish at {1}," + \
" will wait at most till {2}"
now_dt = datetime.datetime.now()
end_dt = now_dt + datetime.timedelta(0, exec_time)
wait_till = now_dt + datetime.timedelta(0, timeout)
logger.info(templ.format(exec_time_str,
end_dt.strftime("%H:%M:%S"),
wait_till.strftime("%H:%M:%S")))
if not nolog:
msg = "Tests started in screen {1} on each testnode"
logger.debug(msg.format(conn_id, screen_name))
# TODO: add monitoring socket
if self.node.connection is not Local:
self.node.connection.close()
self.wait_till_finished(soft_tout, timeout)
if not nolog:
logger.debug("Test on node {0} is finished".format(conn_id))
if self.node.connection is not Local:
conn_timeout = self.tcp_conn_timeout * 3
self.node.connection = connect(self.node.conn_url,
conn_timeout=conn_timeout)
with self.node.connection.open_sftp() as sftp:
return read_from_remote(sftp, self.log_fl)
def merge_results(self, results):
if len(results) == 0:
return None
merged_result = results[0]
merged_data = merged_result['res']
mergable_fields = ['bw', 'clat', 'iops', 'lat', 'slat']
for res in results[1:]:
assert res['__meta__'] == merged_result['__meta__']
data = res['res']
for testname, test_data in data.items():
if testname not in merged_data:
merged_data[testname] = test_data
continue
res_test_data = merged_data[testname]
diff = set(test_data.keys()).symmetric_difference(
res_test_data.keys())
msg = "Difference: {0}".format(",".join(diff))
assert len(diff) == 0, msg
for k, v in test_data.items():
if k in mergable_fields:
res_test_data[k].extend(v)
else:
msg = "{0!r} != {1!r}".format(res_test_data[k], v)
assert res_test_data[k] == v, msg
return merged_result
@classmethod
def format_for_console(cls, data):
return io_formatter.format_results_for_console(data)