tempo commit
diff --git a/wally/suits/__init__.py b/wally/suits/__init__.py
index c4e8854..e69de29 100644
--- a/wally/suits/__init__.py
+++ b/wally/suits/__init__.py
@@ -1,5 +0,0 @@
-from .io import IOPerfTest
-from .mysql import MysqlTest
-from .postgres import PgBenchTest
-
-__all__ = ["MysqlTest", "PgBenchTest", "IOPerfTest"]
diff --git a/wally/suits/io/__init__.py b/wally/suits/io/__init__.py
index 8e82cae..e69de29 100644
--- a/wally/suits/io/__init__.py
+++ b/wally/suits/io/__init__.py
@@ -1,453 +0,0 @@
-import re
-import time
-import json
-import os.path
-import logging
-import datetime
-
-import paramiko
-
-from wally.utils import (ssize2b, sec_to_str, StopTestError)
-
-from wally.ssh_utils import (save_to_remote, read_from_remote, BGSSHTask,
- reconnect)
-
-from ..itest import IPerfTest, TestResults
-from .formatter import format_results_for_console
-from .fio_task_parser import (execution_time, fio_cfg_compile,
- get_test_summary, FioJobSection)
-
-
-logger = logging.getLogger("wally")
-
-
-class IOTestResults(TestResults):
- def summary(self):
- return get_test_summary(self.config) + "vm" + str(self.vm_count)
-
- def get_yamable(self):
- return {
- 'type': "fio_test",
- 'params': self.params,
- 'config': (self.config.name, self.config.vals),
- 'results': self.results,
- 'raw_result': self.raw_result,
- 'run_interval': self.run_interval,
- 'vm_count': self.vm_count,
- 'test_name': self.test_name,
- 'files': self.files
- }
-
- @classmethod
- def from_yaml(cls, data):
- name, vals = data['config']
- sec = FioJobSection(name)
- sec.vals = vals
-
- return cls(sec, data['params'], data['results'],
- data['raw_result'], data['run_interval'],
- data['vm_count'], data['test_name'],
- files=data.get('files', {}))
-
-
-def get_slice_parts_offset(test_slice, real_inteval):
- calc_exec_time = sum(map(execution_time, test_slice))
- coef = (real_inteval[1] - real_inteval[0]) / calc_exec_time
- curr_offset = real_inteval[0]
- for section in test_slice:
- slen = execution_time(section) * coef
- yield (curr_offset, curr_offset + slen)
- curr_offset += slen
-
-
-class IOPerfTest(IPerfTest):
- tcp_conn_timeout = 30
- max_pig_timeout = 5
- soft_runcycle = 5 * 60
-
- def __init__(self, *dt, **mp):
- IPerfTest.__init__(self, *dt, **mp)
- self.config_fname = self.options['cfg']
-
- if '/' not in self.config_fname and '.' not in self.config_fname:
- cfgs_dir = os.path.dirname(__file__)
- self.config_fname = os.path.join(cfgs_dir,
- self.config_fname + '.cfg')
-
- self.alive_check_interval = self.options.get('alive_check_interval')
-
- self.config_params = self.options.get('params', {}).copy()
- self.tool = self.options.get('tool', 'fio')
-
- self.io_py_remote = self.join_remote("agent.py")
- self.results_file = self.join_remote("results.json")
- self.pid_file = self.join_remote("pid")
- self.task_file = self.join_remote("task.cfg")
- self.sh_file = self.join_remote("cmd.sh")
- self.err_out_file = self.join_remote("fio_err_out")
- self.exit_code_file = self.join_remote("exit_code")
- self.use_sudo = self.options.get("use_sudo", True)
- self.test_logging = self.options.get("test_logging", False)
- self.raw_cfg = open(self.config_fname).read()
- self.fio_configs = fio_cfg_compile(self.raw_cfg,
- self.config_fname,
- self.config_params,
- split_on_names=self.test_logging)
- self.fio_configs = list(self.fio_configs)
-
- def __str__(self):
- return "{0}({1})".format(self.__class__.__name__,
- self.node.get_conn_id())
-
- @classmethod
- def load(cls, data):
- return IOTestResults.from_yaml(data)
-
- def cleanup(self):
- # delete_file(conn, self.io_py_remote)
- # Need to remove tempo files, used for testing
- pass
-
- def prefill_test_files(self):
- files = {}
- for cfg_slice in self.fio_configs:
- for section in cfg_slice:
- sz = ssize2b(section.vals['size'])
- msz = sz / (1024 ** 2)
-
- if sz % (1024 ** 2) != 0:
- msz += 1
-
- fname = section.vals['filename']
-
- # if already has other test with the same file name
- # take largest size
- files[fname] = max(files.get(fname, 0), msz)
-
- cmd_templ = "fio --name=xxx --filename={0} --direct=1" + \
- " --bs=4m --size={1}m --rw=write"
-
- if self.use_sudo:
- cmd_templ = "sudo " + cmd_templ
-
- ssize = 0
- stime = time.time()
-
- for fname, curr_sz in files.items():
- cmd = cmd_templ.format(fname, curr_sz)
- ssize += curr_sz
- self.run_over_ssh(cmd, timeout=curr_sz)
-
- ddtime = time.time() - stime
- if ddtime > 1E-3:
- fill_bw = int(ssize / ddtime)
- mess = "Initiall dd fill bw is {0} MiBps for this vm"
- logger.info(mess.format(fill_bw))
- self.coordinate(('init_bw', fill_bw))
-
- def install_utils(self, max_retry=3, timeout=5):
- need_install = []
- for bin_name, package in (('fio', 'fio'), ('screen', 'screen')):
- try:
- self.run_over_ssh('which ' + bin_name, nolog=True)
- except OSError:
- need_install.append(package)
-
- if len(need_install) == 0:
- return
-
- cmd = "sudo apt-get -y install " + " ".join(need_install)
-
- for _ in range(max_retry):
- try:
- self.run_over_ssh(cmd)
- break
- except OSError as err:
- time.sleep(timeout)
- else:
- raise OSError("Can't install - " + str(err))
-
- def pre_run(self):
- try:
- cmd = 'mkdir -p "{0}"'.format(self.remote_dir)
- if self.use_sudo:
- cmd = "sudo " + cmd
- cmd += " ; sudo chown {0} {1}".format(self.node.get_user(),
- self.remote_dir)
-
- self.run_over_ssh(cmd)
- except Exception as exc:
- msg = "Failed to create folder {0} on remote {1}. Error: {2!s}"
- msg = msg.format(self.remote_dir, self.node.get_conn_id(), exc)
- logger.exception(msg)
- raise StopTestError(msg, exc)
-
- self.install_utils()
-
- if self.options.get('prefill_files', True):
- self.prefill_test_files()
- elif self.is_primary:
- logger.warning("Prefilling of test files is disabled")
-
- def run(self, barrier):
- try:
- if len(self.fio_configs) > 1 and self.is_primary:
-
- exec_time = 0
- for test_slice in self.fio_configs:
- exec_time += sum(map(execution_time, test_slice))
-
- # +10% - is a rough estimation for additional operations
- # like sftp, etc
- exec_time = int(exec_time * 1.1)
-
- exec_time_s = sec_to_str(exec_time)
- now_dt = datetime.datetime.now()
- end_dt = now_dt + datetime.timedelta(0, exec_time)
- msg = "Entire test should takes aroud: {0} and finished at {1}"
- logger.info(msg.format(exec_time_s,
- end_dt.strftime("%H:%M:%S")))
-
- for pos, fio_cfg_slice in enumerate(self.fio_configs):
- fio_cfg_slice = list(fio_cfg_slice)
- names = [i.name for i in fio_cfg_slice]
- msgs = []
- already_processed = set()
- for name in names:
- if name not in already_processed:
- already_processed.add(name)
-
- if 1 == names.count(name):
- msgs.append(name)
- else:
- frmt = "{0} * {1}"
- msgs.append(frmt.format(name,
- names.count(name)))
-
- if self.is_primary:
- logger.info("Will run tests: " + ", ".join(msgs))
-
- nolog = (pos != 0) or not self.is_primary
-
- max_retr = 3 if self.total_nodes_count == 1 else 1
-
- for idx in range(max_retr):
- try:
- out_err, interval, files = self.do_run(barrier, fio_cfg_slice, pos,
- nolog=nolog)
- break
- except Exception as exc:
- logger.exception("During fio run")
- if idx == max_retr - 1:
- raise StopTestError("Fio failed", exc)
- logger.info("Sleeping 30s and retrying")
- time.sleep(30)
-
- try:
- # HACK
- out_err = "{" + out_err.split("{", 1)[1]
- full_raw_res = json.loads(out_err)
-
- res = {"bw": [],
- "iops": [],
- "lat": [],
- "clat": [],
- "slat": []}
-
- for raw_result in full_raw_res['jobs']:
- load_data = raw_result['mixed']
-
- res["bw"].append(load_data["bw"])
- res["iops"].append(load_data["iops"])
- res["lat"].append(load_data["lat"]["mean"])
- res["clat"].append(load_data["clat"]["mean"])
- res["slat"].append(load_data["slat"]["mean"])
-
- first = fio_cfg_slice[0]
- p1 = first.vals.copy()
- p1.pop('ramp_time', 0)
- p1.pop('offset', 0)
-
- for nxt in fio_cfg_slice[1:]:
- assert nxt.name == first.name
- p2 = nxt.vals
- p2.pop('_ramp_time', 0)
- p2.pop('offset', 0)
- assert p1 == p2
-
- tname = os.path.basename(self.config_fname)
- if tname.endswith('.cfg'):
- tname = tname[:-4]
-
- tres = IOTestResults(first,
- self.config_params, res,
- full_raw_res, interval,
- test_name=tname,
- vm_count=self.total_nodes_count,
- files=files)
- self.on_result_cb(tres)
- except StopTestError:
- raise
- except Exception as exc:
- msg_templ = "Error during postprocessing results"
- logger.exception(msg_templ)
- raise StopTestError(msg_templ.format(exc), exc)
-
- finally:
- barrier.exit()
-
- def do_run(self, barrier, cfg_slice, pos, nolog=False):
- exec_folder = os.path.dirname(self.task_file)
- bash_file = "#!/bin/bash\n" + \
- "cd {exec_folder}\n" + \
- "fio --output-format=json --output={out_file} " + \
- "--alloc-size=262144 {job_file} " + \
- " >{err_out_file} 2>&1 \n" + \
- "echo $? >{res_code_file}\n"
-
- conn_id = self.node.get_conn_id()
- fconn_id = conn_id.replace(":", "_")
-
- bash_file = bash_file.format(out_file=self.results_file,
- job_file=self.task_file,
- err_out_file=self.err_out_file,
- res_code_file=self.exit_code_file,
- exec_folder=exec_folder)
-
- task_fc = "\n\n".join(map(str, cfg_slice))
- with self.node.connection.open_sftp() as sftp:
- save_to_remote(sftp, self.task_file, task_fc)
- save_to_remote(sftp, self.sh_file, bash_file)
-
- fname = "{0}_{1}.fio".format(pos, fconn_id)
- with open(os.path.join(self.log_directory, fname), "w") as fd:
- fd.write(task_fc)
-
- exec_time = sum(map(execution_time, cfg_slice))
- exec_time_str = sec_to_str(exec_time)
-
- timeout = int(exec_time + max(300, exec_time))
- soft_tout = exec_time
-
- barrier.wait()
-
- if self.is_primary:
- templ = "Test should takes about {0}." + \
- " Should finish at {1}," + \
- " will wait at most till {2}"
- now_dt = datetime.datetime.now()
- end_dt = now_dt + datetime.timedelta(0, exec_time)
- wait_till = now_dt + datetime.timedelta(0, timeout)
-
- logger.info(templ.format(exec_time_str,
- end_dt.strftime("%H:%M:%S"),
- wait_till.strftime("%H:%M:%S")))
-
- task = BGSSHTask(self.node, self.options.get("use_sudo", True))
- begin = time.time()
-
- if self.options.get("use_sudo", True):
- sudo = "sudo "
- else:
- sudo = ""
-
- fnames_before = self.run_over_ssh("ls -1 " + exec_folder, nolog=True)
- task.start(sudo + "bash " + self.sh_file)
-
- while True:
- try:
- task.wait(soft_tout, timeout)
- break
- except paramiko.SSHException:
- pass
-
- try:
- self.node.connection.close()
- except:
- pass
-
- reconnect(self.node.connection, self.node.conn_url)
-
- end = time.time()
- fnames_after = self.run_over_ssh("ls -1 " + exec_folder, nolog=True)
-
- new_files = set(fnames_after.split()) - set(fnames_before.split())
-
- if not nolog:
- logger.debug("Test on node {0} is finished".format(conn_id))
-
- log_files_re = set()
- for cfg in cfg_slice:
- if 'write_lat_log' in cfg.vals:
- fname = cfg.vals['write_lat_log']
- log_files_re.add(fname + '_clat.*log')
- log_files_re.add(fname + '_lat.*log')
- log_files_re.add(fname + '_slat.*log')
-
- if 'write_iops_log' in cfg.vals:
- fname = cfg.vals['write_iops_log']
- log_files_re.add(fname + '_iops.*log')
-
- if 'write_bw_log' in cfg.vals:
- fname = cfg.vals['write_bw_log']
- log_files_re.add(fname + '_bw.*log')
-
- log_files = set()
- for fname in new_files:
- for rexpr in log_files_re:
- if re.match(rexpr + "$", fname):
- log_files.add(fname)
-
- with self.node.connection.open_sftp() as sftp:
- result = read_from_remote(sftp, self.results_file)
- exit_code = read_from_remote(sftp, self.exit_code_file)
- err_out = read_from_remote(sftp, self.err_out_file)
- exit_code = exit_code.strip()
-
- if exit_code != '0':
- msg = "fio exit with code {0}: {1}".format(exit_code, err_out)
- logger.critical(msg.strip())
- raise StopTestError("fio failed")
-
- sftp.remove(self.results_file)
- sftp.remove(self.err_out_file)
- sftp.remove(self.exit_code_file)
-
- fname = "{0}_{1}.json".format(pos, fconn_id)
- with open(os.path.join(self.log_directory, fname), "w") as fd:
- fd.write(result)
-
- files = {}
- for fname in log_files:
- rpath = os.path.join(exec_folder, fname)
-
- try:
- fc = read_from_remote(sftp, rpath)
- except Exception as exc:
- msg = "Can't read file {0} from remote: {1}".format(rpath, exc)
- logger.error(msg)
- continue
-
- sftp.remove(os.path.join(exec_folder, fname))
- ftype = fname.split('_')[-1].split(".")[0]
- loc_fname = "{0}_{1}_{2}.log".format(pos, fconn_id, ftype)
- files.setdefault(ftype, []).append(loc_fname)
-
- loc_path = os.path.join(self.log_directory, loc_fname)
-
- with open(loc_path, "w") as fd:
- fd.write(fc)
-
- return result, (begin, end), files
-
- @classmethod
- def merge_results(cls, results):
- merged = results[0]
- for block in results[1:]:
- assert block["__meta__"] == merged["__meta__"]
- merged['res'].extend(block['res'])
- return merged
-
- @classmethod
- def format_for_console(cls, data, dinfo):
- return format_results_for_console(dinfo)
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
index f7fc9bc..670f42b 100644
--- a/wally/suits/io/fio.py
+++ b/wally/suits/io/fio.py
@@ -503,7 +503,8 @@
res_code_file=self.exit_code_file,
exec_folder=exec_folder)
- run_on_node(node)("cd {0} ; rm -rf *".format(exec_folder), nolog=True)
+ assert exec_folder != "" and exec_folder != "/"
+ run_on_node(node)("rm -rf {0}/*".format(exec_folder), nolog=True)
with node.connection.open_sftp() as sftp:
print ">>>>", self.task_file