tempo commit
diff --git a/wally/run_test.py b/wally/run_test.py
index 359d917..d7e803a 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -1,15 +1,14 @@
from __future__ import print_function
import os
+import re
import sys
import time
-import Queue
import pprint
import signal
import logging
import argparse
import functools
-import threading
import contextlib
import collections
@@ -36,11 +35,16 @@
from wally.discover import discover, Node
from wally.timeseries import SensorDatastore
from wally import utils, report, ssh_utils, start_vms
-from wally.suits import IOPerfTest, PgBenchTest, MysqlTest
from wally.config import (cfg_dict, load_config, setup_loggers,
get_test_files, save_run_params, load_run_params)
from wally.sensors_utils import with_sensors_util, sensors_info_util
+from wally.suits.mysql import MysqlTest
+from wally.suits.itest import TestConfig
+from wally.suits.io.fio import IOPerfTest
+from wally.suits.postgres import PgBenchTest
+
+
TOOL_TYPE_MAPPER = {
"io": IOPerfTest,
"pgbench": PgBenchTest,
@@ -137,100 +141,19 @@
logger.debug("Raw hardware info in " + cfg['hwinfo_directory'] + " folder")
-def test_thread(test, node, barrier, res_q):
- exc = None
- try:
- logger.debug("Run preparation for {0}".format(node.get_conn_id()))
- test.pre_run()
- logger.debug("Run test for {0}".format(node.get_conn_id()))
- test.run(barrier)
- except utils.StopTestError as exc:
- pass
- except Exception as exc:
- msg = "In test {0} for node {1}"
- msg = msg.format(test, node.get_conn_id())
- logger.exception(msg)
- exc = utils.StopTestError(msg, exc)
-
- try:
- test.cleanup()
- except utils.StopTestError as exc1:
- if exc is None:
- exc = exc1
- except:
- msg = "Duringf cleanup - in test {0} for node {1}"
- logger.exception(msg.format(test, node))
-
- if exc is not None:
- res_q.put(exc)
-
-
-def run_single_test(test_nodes, name, test_cls, params, log_directory,
+def run_single_test(test_nodes, name, params, log_directory,
test_local_folder, run_uuid):
- logger.info("Starting {0} tests".format(name))
- res_q = Queue.Queue()
- threads = []
- coord_q = Queue.Queue()
- rem_folder = test_local_folder.format(name=name)
- barrier = utils.Barrier(len(test_nodes))
- for idx, node in enumerate(test_nodes):
- msg = "Starting {0} test on {1} node"
- logger.debug(msg.format(name, node.conn_url))
+ test_cls = TOOL_TYPE_MAPPER[name]
+ test_cfg = TestConfig(test_cls.__name__,
+ params=params,
+ test_uuid=run_uuid,
+ nodes=test_nodes,
+ log_directory=log_directory,
+ remote_dir=test_local_folder.format(name=name))
- params = params.copy()
- params['testnodes_count'] = len(test_nodes)
- test = test_cls(options=params,
- is_primary=(idx == 0),
- on_result_cb=res_q.put,
- test_uuid=run_uuid,
- node=node,
- remote_dir=rem_folder,
- log_directory=log_directory,
- coordination_queue=coord_q,
- total_nodes_count=len(test_nodes))
- th = threading.Thread(None, test_thread,
- "Test:" + node.get_conn_id(),
- (test, node, barrier, res_q))
- threads.append(th)
- th.daemon = True
- th.start()
-
- th = threading.Thread(None, test_cls.coordination_th,
- "Coordination thread",
- (coord_q, barrier, len(threads)))
- threads.append(th)
- th.daemon = True
- th.start()
-
- results = []
- coord_q.put(None)
-
- while len(threads) != 0:
- nthreads = []
- time.sleep(0.1)
-
- for th in threads:
- if not th.is_alive():
- th.join()
- else:
- nthreads.append(th)
-
- threads = nthreads
-
- while not res_q.empty():
- val = res_q.get()
-
- if isinstance(val, utils.StopTestError):
- raise val
-
- if isinstance(val, Exception):
- msg = "Exception during test execution: {0!s}"
- raise ValueError(msg.format(val))
-
- results.append(val)
-
- return results
+ test = test_cls(test_cfg)
+ return test.run()
def suspend_vm_nodes(unused_nodes):
@@ -311,14 +234,12 @@
len(resumable_nodes_ids)))
start_vms.unpause(resumable_nodes_ids)
- test_cls = TOOL_TYPE_MAPPER[name]
try:
sens_nodes = curr_test_nodes + not_test_nodes
with sensors_info_util(cfg, sens_nodes) as sensor_data:
t_start = time.time()
res = run_single_test(curr_test_nodes,
name,
- test_cls,
params,
dir_path,
cfg['default_test_local_folder'],
@@ -432,27 +353,6 @@
ctx.nodes.append(node)
-def get_creds_openrc(path):
- fc = open(path).read()
-
- echo = 'echo "$OS_TENANT_NAME:$OS_USERNAME:$OS_PASSWORD@$OS_AUTH_URL"'
-
- msg = "Failed to get creads from openrc file"
- with utils.log_error(msg):
- data = utils.run_locally(['/bin/bash'],
- input_data=fc + "\n" + echo)
-
- msg = "Failed to get creads from openrc file: " + data
- with utils.log_error(msg):
- data = data.strip()
- user, tenant, passwd_auth_url = data.split(':', 2)
- passwd, auth_url = passwd_auth_url.rsplit("@", 1)
- assert (auth_url.startswith("https://") or
- auth_url.startswith("http://"))
-
- return user, passwd, tenant, auth_url
-
-
def get_OS_credentials(cfg, ctx):
creds = None
tenant = None
@@ -461,8 +361,7 @@
os_cfg = cfg['clouds']['openstack']
if 'OPENRC' in os_cfg:
logger.info("Using OS credentials from " + os_cfg['OPENRC'])
- user, passwd, tenant, auth_url = \
- get_creds_openrc(os_cfg['OPENRC'])
+ user, passwd, tenant, auth_url = utils.get_creds_openrc(os_cfg['OPENRC'])
elif 'ENV' in os_cfg:
logger.info("Using OS credentials from shell environment")
user, passwd, tenant, auth_url = start_vms.ostack_get_creds()
@@ -488,8 +387,7 @@
'tenant': tenant,
'auth_url': auth_url}
- msg = "OS_CREDS: user={name} tenant={tenant} auth_url={auth_url}"
- logger.debug(msg.format(**creds))
+ logger.debug("OS_CREDS: user={name} tenant={tenant} auth_url={auth_url}".format(**creds))
return creds
@@ -516,8 +414,7 @@
new_nodes = []
try:
- for new_node, node_id in start_vms.launch_vms(params,
- already_has_count):
+ for new_node, node_id in start_vms.launch_vms(params, already_has_count):
new_node.roles.append('testnode')
ctx.nodes.append(new_node)
os_nodes_ids.append(node_id)
@@ -695,24 +592,23 @@
ctx.results[tp] = map(cls.load, results)
+def load_data_from_path(var_dir, _, ctx):
+ ctx.results = {}
+ res_dir = os.path.join(var_dir, 'results')
+ for dir_name in os.listdir(res_dir):
+ dir_path = os.path.join(res_dir, dir_name)
+ if not os.path.isdir(dir_path):
+ continue
+ rr = re.match(r"(?P<type>[a-z]+)_\d+$", dir_name)
+ if rr is None:
+ continue
+ tp = rr.group('type')
+ arr = ctx.results.setdefault(tp, [])
+ arr.extend(TOOL_TYPE_MAPPER[tp].load(dir_path))
+
+
def load_data_from(var_dir):
- return functools.partial(load_data_from_file, var_dir)
-
-
-def start_web_ui(cfg, ctx):
- if webui is None:
- logger.error("Can't start webui. Install cherrypy module")
- ctx.web_thread = None
- else:
- th = threading.Thread(None, webui.web_main_thread, "webui", (None,))
- th.daemon = True
- th.start()
- ctx.web_thread = th
-
-
-def stop_web_ui(cfg, ctx):
- webui.web_main_stop()
- time.sleep(1)
+ return functools.partial(load_data_from_path, var_dir)
def parse_args(argv):
@@ -899,9 +795,6 @@
cfg_dict['no_tests'] = opts.no_tests
cfg_dict['dont_discover_nodes'] = opts.dont_discover_nodes
- if cfg_dict.get('run_web_ui', False):
- start_web_ui(cfg_dict, ctx)
-
for stage in stages:
ok = False
with log_stage(stage):
@@ -928,9 +821,6 @@
logger.info("All info stored in {0} folder".format(cfg_dict['var_dir']))
- if cfg_dict.get('run_web_ui', False):
- stop_web_ui(cfg_dict, ctx)
-
if exc is None:
logger.info("Tests finished successfully")
return 0
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
new file mode 100644
index 0000000..f7fc9bc
--- /dev/null
+++ b/wally/suits/io/fio.py
@@ -0,0 +1,702 @@
+import re
+import time
+import json
+import os.path
+import logging
+import datetime
+import functools
+import subprocess
+import collections
+
+import yaml
+import paramiko
+import texttable
+from paramiko.ssh_exception import SSHException
+from concurrent.futures import ThreadPoolExecutor
+
+from wally.pretty_yaml import dumps
+from wally.statistic import round_3_digit, data_property
+from wally.utils import ssize2b, sec_to_str, StopTestError, Barrier, get_os
+from wally.ssh_utils import (save_to_remote, read_from_remote, BGSSHTask, reconnect)
+
+from .fio_task_parser import (execution_time, fio_cfg_compile,
+ get_test_summary, get_test_sync_mode)
+from ..itest import TimeSeriesValue, PerfTest, TestResults, run_on_node
+
+logger = logging.getLogger("wally")
+
+
+# Results folder structure
+# results/
+# {loadtype}_{num}/
+# config.yaml
+# ......
+
+
+class NoData(object):
+ pass
+
+
+def cached_prop(func):
+ @property
+ @functools.wraps(func)
+ def closure(self):
+ val = getattr(self, "_" + func.__name__)
+ if val is NoData:
+ val = func(self)
+ setattr(self, "_" + func.__name__, val)
+ return val
+ return closure
+
+
+def load_fio_log_file(fname):
+ with open(fname) as fd:
+ it = [ln.split(',')[:2] for ln in fd]
+ vals = [(float(off) / 1000, float(val.strip())) for off, val in it]
+ return TimeSeriesValue(vals)
+
+
+def load_test_results(cls, folder, run_num):
+ res = {}
+ params = None
+
+ fn = os.path.join(folder, str(run_num) + '_params.yaml')
+ params = yaml.load(open(fn).read())
+
+ conn_ids = set()
+ for fname in os.listdir(folder):
+ rr = r"{0}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.\d+\.log$".format(run_num)
+ rm = re.match(rr, fname)
+ if rm is None:
+ continue
+
+ conn_id_s = rm.group('conn_id')
+ conn_id = conn_id_s.replace('_', ':')
+ ftype = rm.group('type')
+
+ if ftype not in ('iops', 'bw', 'lat'):
+ continue
+
+ try:
+ ts = load_fio_log_file(os.path.join(folder, fname))
+ if ftype in res:
+ assert conn_id not in res[ftype]
+
+ res.setdefault(ftype, {})[conn_id] = ts
+ except AssertionError:
+ pass
+
+ conn_ids.add(conn_id)
+
+ raw_res = {}
+ for conn_id in conn_ids:
+ fn = os.path.join(folder, "{0}_{1}_rawres.json".format(run_num, conn_id_s))
+
+ # remove message hack
+ fc = "{" + open(fn).read().split('{', 1)[1]
+ raw_res[conn_id] = json.loads(fc)
+
+ return cls(params, res, raw_res)
+
+
+class Attrmapper(object):
+ def __init__(self, dct):
+ self.__dct = dct
+
+ def __getattr__(self, name):
+ try:
+ return self.__dct[name]
+ except KeyError:
+ raise AttributeError(name)
+
+
+class DiskPerfInfo(object):
+ def __init__(self, name, summary, params, testnodes_count):
+ self.name = name
+ self.bw = None
+ self.iops = None
+ self.lat = None
+ self.lat_50 = None
+ self.lat_95 = None
+
+ self.raw_bw = []
+ self.raw_iops = []
+ self.raw_lat = []
+
+ self.params = params
+ self.testnodes_count = testnodes_count
+ self.summary = summary
+ self.p = Attrmapper(self.params['vals'])
+
+ self.sync_mode = get_test_sync_mode(self.params['vals'])
+ self.concurence = self.params['vals'].get('numjobs', 1)
+
+
+def get_lat_perc_50_95(lat_mks):
+ curr_perc = 0
+ perc_50 = None
+ perc_95 = None
+ pkey = None
+ for key, val in sorted(lat_mks.items()):
+ if curr_perc + val >= 50 and perc_50 is None:
+ if pkey is None or val < 1.:
+ perc_50 = key
+ else:
+ perc_50 = (50. - curr_perc) / val * (key - pkey) + pkey
+
+ if curr_perc + val >= 95:
+ if pkey is None or val < 1.:
+ perc_95 = key
+ else:
+ perc_95 = (95. - curr_perc) / val * (key - pkey) + pkey
+ break
+
+ pkey = key
+ curr_perc += val
+
+ return perc_50 / 1000., perc_95 / 1000.
+
+
+def prepare(ramp_time, data, avg_interval):
+ if data is None:
+ return data
+
+ res = {}
+ for key, ts_data in data.items():
+ if ramp_time > 0:
+ ts_data = ts_data.skip(ramp_time)
+
+ res[key] = ts_data.derived(avg_interval)
+ return res
+
+
+class IOTestResult(TestResults):
+ """
+ Fio run results
+ config: TestConfig
+ fio_task: FioJobSection
+ ts_results: {str: TimeSeriesValue}
+ raw_result: ????
+ run_interval:(float, float) - test tun time, used for sensors
+ """
+ def __init__(self, config, fio_task, ts_results, raw_result, run_interval):
+
+ self.name = fio_task.name.split("_")[0]
+ self.fio_task = fio_task
+
+ ramp_time = fio_task.vals.get('ramp_time', 0)
+
+ self.bw = prepare(ramp_time, ts_results.get('bw'), 1.0)
+ self.lat = prepare(ramp_time, ts_results.get('lat'), 1.0)
+ self.iops = prepare(ramp_time, ts_results.get('iops'), 1.0)
+ # self.slat = drop_warmup(res.get('clat', None), self.params)
+ # self.clat = drop_warmup(res.get('slat', None), self.params)
+
+ res = {"bw": self.bw, "lat": self.lat, "iops": self.iops}
+
+ self.sensors_data = None
+ self._pinfo = None
+ TestResults.__init__(self, config, res, raw_result, run_interval)
+
+ def summary(self):
+ return get_test_summary(self.fio_task) + "vm" \
+ + str(len(self.config.nodes))
+
+ def get_yamable(self):
+ return self.summary()
+
+ @property
+ def disk_perf_info(self):
+ if self._pinfo is not None:
+ return self._pinfo
+
+ lat_mks = collections.defaultdict(lambda: 0)
+ num_res = 0
+
+ for _, result in self.raw_result.items():
+ num_res += len(result['jobs'])
+ for job_info in result['jobs']:
+ for k, v in job_info['latency_ms'].items():
+ if isinstance(k, basestring) and k.startswith('>='):
+ lat_mks[int(k[2:]) * 1000] += v
+ else:
+ lat_mks[int(k) * 1000] += v
+
+ for k, v in job_info['latency_us'].items():
+ lat_mks[int(k)] += v
+
+ for k, v in lat_mks.items():
+ lat_mks[k] = float(v) / num_res
+
+ testnodes_count = len(self.fio_raw_res)
+
+ pinfo = DiskPerfInfo(self.name,
+ self.summary(),
+ self.params,
+ testnodes_count)
+
+ pinfo.raw_bw = [res.vals() for res in self.bw.values()]
+ pinfo.raw_iops = [res.vals() for res in self.iops.values()]
+ pinfo.raw_lat = [res.vals() for res in self.lat.values()]
+
+ pinfo.bw = data_property(map(sum, zip(*pinfo.raw_bw)))
+ pinfo.iops = data_property(map(sum, zip(*pinfo.raw_iops)))
+ pinfo.lat = data_property(sum(pinfo.raw_lat, []))
+ pinfo.lat_50, pinfo.lat_95 = get_lat_perc_50_95(lat_mks)
+
+ self._pinfo = pinfo
+
+ return pinfo
+
+
+class IOPerfTest(PerfTest):
+ tcp_conn_timeout = 30
+ max_pig_timeout = 5
+ soft_runcycle = 5 * 60
+
+ def __init__(self, config):
+ PerfTest.__init__(self, config)
+
+ get = self.config.params.get
+ do_get = self.config.params.__getitem__
+
+ self.config_fname = do_get('cfg')
+
+ if '/' not in self.config_fname and '.' not in self.config_fname:
+ cfgs_dir = os.path.dirname(__file__)
+ self.config_fname = os.path.join(cfgs_dir,
+ self.config_fname + '.cfg')
+
+ self.alive_check_interval = get('alive_check_interval')
+ self.use_system_fio = get('use_system_fio', False)
+
+ self.config_params = get('params', {}).copy()
+
+ self.io_py_remote = self.join_remote("agent.py")
+ self.results_file = self.join_remote("results.json")
+ self.pid_file = self.join_remote("pid")
+ self.task_file = self.join_remote("task.cfg")
+ self.sh_file = self.join_remote("cmd.sh")
+ self.err_out_file = self.join_remote("fio_err_out")
+ self.exit_code_file = self.join_remote("exit_code")
+
+ self.use_sudo = get("use_sudo", True)
+ self.test_logging = get("test_logging", False)
+
+ self.raw_cfg = open(self.config_fname).read()
+ self.fio_configs = fio_cfg_compile(self.raw_cfg,
+ self.config_fname,
+ self.config_params,
+ split_on_names=self.test_logging)
+ self.fio_configs = list(self.fio_configs)
+
+ @classmethod
+ def load(cls, folder):
+ for fname in os.listdir(folder):
+ if re.match("\d+_params.yaml$", fname):
+ num = int(fname.split('_')[0])
+ yield load_test_results(IOTestResult, folder, num)
+
+ def cleanup(self):
+ # delete_file(conn, self.io_py_remote)
+ # Need to remove tempo files, used for testing
+ pass
+
+ def prefill_test_files(self, files, rossh):
+ cmd_templ = "fio --name=xxx --filename={0} --direct=1" + \
+ " --bs=4m --size={1}m --rw=write"
+
+ if self.use_sudo:
+ cmd_templ = "sudo " + cmd_templ
+
+ ssize = 0
+ stime = time.time()
+
+ for fname, curr_sz in files.items():
+ cmd = cmd_templ.format(fname, curr_sz)
+ ssize += curr_sz
+
+ rossh(cmd, timeout=curr_sz)
+
+ ddtime = time.time() - stime
+ if ddtime > 1E-3:
+ fill_bw = int(ssize / ddtime)
+ mess = "Initiall dd fill bw is {0} MiBps for this vm"
+ logger.info(mess.format(fill_bw))
+ return fill_bw
+
+ def install_utils(self, rossh, max_retry=3, timeout=5):
+ need_install = []
+ packs = [('screen', 'screen')]
+
+ if self.use_system_fio:
+ packs.append(('fio', 'fio'))
+ else:
+ # define OS and x32/x64
+ # copy appropriate fio
+ # add fio deps
+ pass
+
+ for bin_name, package in packs:
+ if bin_name is None:
+ need_install.append(package)
+ continue
+
+ try:
+ rossh('which ' + bin_name, nolog=True)
+ except OSError:
+ need_install.append(package)
+
+ if len(need_install) == 0:
+ return
+
+ if 'redhat' == get_os(rossh):
+ cmd = "sudo yum -y install " + " ".join(need_install)
+ else:
+ cmd = "sudo apt-get -y install " + " ".join(need_install)
+
+ for _ in range(max_retry):
+ try:
+ rossh(cmd)
+ break
+ except OSError as err:
+ time.sleep(timeout)
+ else:
+ raise OSError("Can't install - " + str(err))
+
+ def pre_run(self):
+ prefill = False
+ prefill = self.config.options.get('prefill_files', True)
+
+ if prefill:
+ files = {}
+ for cfg_slice in self.fio_configs:
+ for section in cfg_slice:
+ sz = ssize2b(section.vals['size'])
+ msz = sz / (1024 ** 2)
+
+ if sz % (1024 ** 2) != 0:
+ msz += 1
+
+ fname = section.vals['filename']
+
+ # if already has other test with the same file name
+ # take largest size
+ files[fname] = max(files.get(fname, 0), msz)
+ else:
+ files = None
+ logger.warning("Prefilling of test files is disabled")
+
+ with ThreadPoolExecutor(len(self.config.nodes)) as pool:
+ fc = functools.partial(self.pre_run_th, files=files)
+ list(pool.map(fc, self.config.nodes))
+
+ def pre_run_th(self, node, files):
+ # fill files with pseudo-random data
+ rossh = run_on_node(node)
+
+ try:
+ cmd = 'mkdir -p "{0}"'.format(self.config.remote_dir)
+ if self.use_sudo:
+ cmd = "sudo " + cmd
+ cmd += " ; sudo chown {0} {1}".format(self.node.get_user(),
+ self.config.remote_dir)
+
+ rossh(cmd)
+ except Exception as exc:
+ msg = "Failed to create folder {0} on remote {1}. Error: {2!s}"
+ msg = msg.format(self.config.remote_dir, self.node.get_conn_id(), exc)
+ logger.exception(msg)
+ raise StopTestError(msg, exc)
+
+ if files is not None:
+ self.prefill_test_files(rossh, files)
+
+ self.install_utils(rossh)
+
+ def run(self):
+ if len(self.fio_configs) > 1:
+ # +10% - is a rough estimation for additional operations
+ # like sftp, etc
+ exec_time = int(sum(map(execution_time, self.fio_configs)) * 1.1)
+ exec_time_s = sec_to_str(exec_time)
+ now_dt = datetime.datetime.now()
+ end_dt = now_dt + datetime.timedelta(0, exec_time)
+ msg = "Entire test should takes aroud: {0} and finished at {1}"
+ logger.info(msg.format(exec_time_s,
+ end_dt.strftime("%H:%M:%S")))
+
+ tname = os.path.basename(self.config_fname)
+ if tname.endswith('.cfg'):
+ tname = tname[:-4]
+
+ barrier = Barrier(len(self.config.nodes))
+ results = []
+
+ with ThreadPoolExecutor(len(self.config.nodes)) as pool:
+ for pos, fio_cfg in enumerate(self.fio_configs):
+ logger.info("Will run {0} test".format(fio_cfg.name))
+
+ templ = "Test should takes about {0}." + \
+ " Should finish at {1}," + \
+ " will wait at most till {2}"
+ exec_time = execution_time(fio_cfg)
+ exec_time_str = sec_to_str(exec_time)
+ timeout = int(exec_time + max(300, exec_time))
+
+ now_dt = datetime.datetime.now()
+ end_dt = now_dt + datetime.timedelta(0, exec_time)
+ wait_till = now_dt + datetime.timedelta(0, timeout)
+
+ logger.info(templ.format(exec_time_str,
+ end_dt.strftime("%H:%M:%S"),
+ wait_till.strftime("%H:%M:%S")))
+
+ func = functools.partial(self.do_run,
+ barrier=barrier,
+ fio_cfg=fio_cfg,
+ pos=pos)
+
+ max_retr = 3
+ for idx in range(max_retr):
+ try:
+ intervals = list(pool.map(func, self.config.nodes))
+ break
+ except (EnvironmentError, SSHException) as exc:
+ logger.exception("During fio run")
+ if idx == max_retr - 1:
+ raise StopTestError("Fio failed", exc)
+
+ logger.info("Sleeping 30s and retrying")
+ time.sleep(30)
+
+ fname = "{0}_task.fio".format(pos)
+ with open(os.path.join(self.config.log_directory, fname), "w") as fd:
+ fd.write(str(fio_cfg))
+
+ params = {'vm_count': len(self.config.nodes)}
+ params['name'] = fio_cfg.name
+ params['vals'] = dict(fio_cfg.vals.items())
+ params['intervals'] = intervals
+ params['nodes'] = [node.get_conn_id() for node in self.config.nodes]
+
+ fname = "{0}_params.yaml".format(pos)
+ with open(os.path.join(self.config.log_directory, fname), "w") as fd:
+ fd.write(dumps(params))
+
+ res = load_test_results(self.config.log_directory, pos)
+ results.append(res)
+ return results
+
+ def do_run(self, node, barrier, fio_cfg, pos, nolog=False):
+ exec_folder = os.path.dirname(self.task_file)
+ bash_file = "#!/bin/bash\n" + \
+ "cd {exec_folder}\n" + \
+ "fio --output-format=json --output={out_file} " + \
+ "--alloc-size=262144 {job_file} " + \
+ " >{err_out_file} 2>&1 \n" + \
+ "echo $? >{res_code_file}\n"
+
+ bash_file = bash_file.format(out_file=self.results_file,
+ job_file=self.task_file,
+ err_out_file=self.err_out_file,
+ res_code_file=self.exit_code_file,
+ exec_folder=exec_folder)
+
+ run_on_node(node)("cd {0} ; rm -rf *".format(exec_folder), nolog=True)
+
+ with node.connection.open_sftp() as sftp:
+ print ">>>>", self.task_file
+ save_to_remote(sftp, self.task_file, str(fio_cfg))
+ save_to_remote(sftp, self.sh_file, bash_file)
+
+ exec_time = execution_time(fio_cfg)
+
+ timeout = int(exec_time + max(300, exec_time))
+ soft_tout = exec_time
+
+ begin = time.time()
+
+ if self.config.options.get("use_sudo", True):
+ sudo = "sudo "
+ else:
+ sudo = ""
+
+ fnames_before = run_on_node(node)("ls -1 " + exec_folder, nolog=True)
+
+ barrier.wait()
+
+ task = BGSSHTask(node, self.config.options.get("use_sudo", True))
+ task.start(sudo + "bash " + self.sh_file)
+
+ while True:
+ try:
+ task.wait(soft_tout, timeout)
+ break
+ except paramiko.SSHException:
+ pass
+
+ try:
+ node.connection.close()
+ except:
+ pass
+
+ reconnect(node.connection, node.conn_url)
+
+ end = time.time()
+ rossh = run_on_node(node)
+ fnames_after = rossh("ls -1 " + exec_folder, nolog=True)
+
+ conn_id = node.get_conn_id().replace(":", "_")
+ if not nolog:
+ logger.debug("Test on node {0} is finished".format(conn_id))
+
+ log_files_pref = []
+ if 'write_lat_log' in fio_cfg.vals:
+ fname = fio_cfg.vals['write_lat_log']
+ log_files_pref.append(fname + '_clat')
+ log_files_pref.append(fname + '_lat')
+ log_files_pref.append(fname + '_slat')
+
+ if 'write_iops_log' in fio_cfg.vals:
+ fname = fio_cfg.vals['write_iops_log']
+ log_files_pref.append(fname + '_iops')
+
+ if 'write_bw_log' in fio_cfg.vals:
+ fname = fio_cfg.vals['write_bw_log']
+ log_files_pref.append(fname + '_bw')
+
+ files = collections.defaultdict(lambda: [])
+ all_files = [os.path.basename(self.results_file)]
+ new_files = set(fnames_after.split()) - set(fnames_before.split())
+ for fname in new_files:
+ if fname.endswith('.log') and fname.split('.')[0] in log_files_pref:
+ name, _ = os.path.splitext(fname)
+ if fname.count('.') == 1:
+ tp = name.split("_")[-1]
+ cnt = 0
+ else:
+ tp_cnt = name.split("_")[-1]
+ tp, cnt = tp_cnt.split('.')
+ files[tp].append((int(cnt), fname))
+ all_files.append(fname)
+
+ arch_name = self.join_remote('wally_result.tar.gz')
+ tmp_dir = os.path.join(self.config.log_directory, 'tmp_' + conn_id)
+ os.mkdir(tmp_dir)
+ loc_arch_name = os.path.join(tmp_dir, 'wally_result.{0}.tar.gz'.format(conn_id))
+ file_full_names = " ".join(all_files)
+
+ try:
+ os.unlink(loc_arch_name)
+ except:
+ pass
+
+ with node.connection.open_sftp() as sftp:
+ exit_code = read_from_remote(sftp, self.exit_code_file)
+ err_out = read_from_remote(sftp, self.err_out_file)
+ exit_code = exit_code.strip()
+
+ if exit_code != '0':
+ msg = "fio exit with code {0}: {1}".format(exit_code, err_out)
+ logger.critical(msg.strip())
+ raise StopTestError("fio failed")
+
+ rossh("rm -f {0}".format(arch_name), nolog=True)
+ cmd = "cd {0} ; tar zcvf {1} {2}".format(exec_folder, arch_name, file_full_names)
+ rossh(cmd, nolog=True)
+ sftp.get(arch_name, loc_arch_name)
+
+ cmd = "cd {0} ; tar xvzf {1} >/dev/null".format(tmp_dir, loc_arch_name)
+ subprocess.check_call(cmd, shell=True)
+ os.unlink(loc_arch_name)
+
+ for ftype, fls in files.items():
+ for idx, fname in fls:
+ cname = os.path.join(tmp_dir, fname)
+ loc_fname = "{0}_{1}_{2}.{3}.log".format(pos, conn_id, ftype, idx)
+ loc_path = os.path.join(self.config.log_directory, loc_fname)
+ os.rename(cname, loc_path)
+
+ cname = os.path.join(tmp_dir,
+ os.path.basename(self.results_file))
+ loc_fname = "{0}_{1}_rawres.json".format(pos, conn_id)
+ loc_path = os.path.join(self.config.log_directory, loc_fname)
+ os.rename(cname, loc_path)
+
+ os.rmdir(tmp_dir)
+ return begin, end
+
+ @classmethod
+ def format_for_console(cls, data, dinfo):
+ """
+ create a table with io performance report
+ for console
+ """
+
+ def getconc(data):
+ th_count = data.params['vals'].get('numjobs')
+
+ if th_count is None:
+ th_count = data.params['vals'].get('concurence', 1)
+ return th_count
+
+ def key_func(data):
+ p = data.params['vals']
+
+ th_count = getconc(data)
+
+ return (data.name.rsplit("_", 1)[0],
+ p['rw'],
+ get_test_sync_mode(data.params),
+ ssize2b(p['blocksize']),
+ int(th_count) * data.testnodes_count)
+
+ tab = texttable.Texttable(max_width=120)
+ tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
+ tab.set_cols_align(["l", "l", "r", "r", "r", "r", "r", "r", "r"])
+
+ items = sorted(dinfo.values(), key=key_func)
+
+ prev_k = None
+ header = ["Name", "Description", "iops\ncum", "KiBps\ncum",
+ "Cnf\n95%", "Dev%", "iops\nper vm", "KiBps\nper vm", "lat\nms"]
+
+ for data in items:
+ curr_k = key_func(data)[:4]
+
+ if prev_k is not None:
+ if prev_k != curr_k:
+ tab.add_row(
+ ["-------", "-----------", "-----", "------",
+ "---", "----", "------", "---", "-----"])
+
+ prev_k = curr_k
+
+ test_dinfo = dinfo[(data.name, data.summary)]
+
+ iops, _ = test_dinfo.iops.rounded_average_conf()
+
+ bw, bw_conf = test_dinfo.bw.rounded_average_conf()
+ _, bw_dev = test_dinfo.bw.rounded_average_dev()
+ conf_perc = int(round(bw_conf * 100 / bw))
+ dev_perc = int(round(bw_dev * 100 / bw))
+
+ lat, _ = test_dinfo.lat.rounded_average_conf()
+ lat = round_3_digit(int(lat) // 1000)
+
+ iops_per_vm = round_3_digit(iops / data.testnodes_count)
+ bw_per_vm = round_3_digit(bw / data.testnodes_count)
+
+ iops = round_3_digit(iops)
+ bw = round_3_digit(bw)
+
+ params = (data.name.rsplit('_', 1)[0],
+ data.summary, int(iops), int(bw), str(conf_perc),
+ str(dev_perc),
+ int(iops_per_vm), int(bw_per_vm), lat)
+ tab.add_row(params)
+
+ tab.header(header)
+
+ return tab.draw()
diff --git a/wally/suits/io/fio_task_parser.py b/wally/suits/io/fio_task_parser.py
index ade0028..e8ec6f9 100644
--- a/wally/suits/io/fio_task_parser.py
+++ b/wally/suits/io/fio_task_parser.py
@@ -305,8 +305,13 @@
def get_test_sync_mode(sec):
- is_sync = str(sec.vals.get("sync", "0")) == "1"
- is_direct = str(sec.vals.get("direct", "0")) == "1"
+ if isinstance(sec, dict):
+ vals = sec
+ else:
+ vals = sec.vals
+
+ is_sync = str(vals.get("sync", "0")) == "1"
+ is_direct = str(vals.get("direct", "0")) == "1"
if is_sync and is_direct:
return 'x'
@@ -319,23 +324,28 @@
def get_test_summary(sec):
+ if isinstance(sec, dict):
+ vals = sec
+ else:
+ vals = sec.vals
+
rw = {"randread": "rr",
"randwrite": "rw",
"read": "sr",
"write": "sw",
"randrw": "rm",
"rw": "sm",
- "readwrite": "sm"}[sec.vals["rw"]]
+ "readwrite": "sm"}[vals["rw"]]
sync_mode = get_test_sync_mode(sec)
- th_count = sec.vals.get('numjobs')
+ th_count = vals.get('numjobs')
if th_count is None:
- th_count = sec.vals.get('concurence', 1)
+ th_count = vals.get('concurence', 1)
return "{0}{1}{2}th{3}".format(rw,
sync_mode,
- sec.vals['blocksize'],
+ vals['blocksize'],
th_count)
@@ -343,65 +353,6 @@
return sec.vals.get('ramp_time', 0) + sec.vals.get('runtime', 0)
-def slice_config(sec_iter, runcycle=None, max_jobs=1000, split_on_names=False):
- jcount = 0
- runtime = 0
- curr_slice = []
- prev_name = None
-
- for pos, sec in enumerate(sec_iter):
-
- if prev_name is not None:
- split_here = False
-
- if split_on_names and prev_name != sec.name:
- split_here = True
-
- if split_here:
- yield curr_slice
- curr_slice = []
- runtime = 0
- jcount = 0
-
- prev_name = sec.name
-
- jc = sec.vals.get('numjobs', 1)
- msg = "numjobs should be integer, not {0!r}".format(jc)
- assert isinstance(jc, int), msg
-
- curr_task_time = execution_time(sec)
-
- if jc > max_jobs:
- err_templ = "Can't process job {0!r} - too large numjobs"
- raise ValueError(err_templ.format(sec.name))
-
- if runcycle is not None and len(curr_slice) != 0:
- rc_ok = curr_task_time + runtime <= runcycle
- else:
- rc_ok = True
-
- if jc + jcount <= max_jobs and rc_ok:
- runtime += curr_task_time
- jcount += jc
- curr_slice.append(sec)
- continue
-
- assert len(curr_slice) != 0
- yield curr_slice
-
- if '_ramp_time' in sec.vals:
- sec.vals['ramp_time'] = sec.vals.pop('_ramp_time')
- curr_task_time = execution_time(sec)
-
- runtime = curr_task_time
- jcount = jc
- curr_slice = [sec]
- prev_name = None
-
- if curr_slice != []:
- yield curr_slice
-
-
def parse_all_in_1(source, fname=None):
return fio_config_parse(fio_config_lexer(source, fname))
@@ -417,8 +368,7 @@
it = (apply_params(sec, test_params) for sec in it)
it = flatmap(process_cycles, it)
it = flatmap(process_repeats, it)
- it = itertools.imap(finall_process, it)
- return slice_config(it, **slice_params)
+ return itertools.imap(finall_process, it)
def parse_args(argv):
diff --git a/wally/suits/io/rrd.cfg b/wally/suits/io/rrd.cfg
index 78e1f0e..42ce09f 100644
--- a/wally/suits/io/rrd.cfg
+++ b/wally/suits/io/rrd.cfg
@@ -2,8 +2,8 @@
include defaults.cfg
size={TEST_FILE_SIZE}
-ramp_time=5
-runtime=40
+ramp_time=0
+runtime=5
# ---------------------------------------------------------------------
[rws_{TEST_SUMM}]
diff --git a/wally/suits/io/verify.cfg b/wally/suits/io/verify.cfg
index 92c78e5..35cf48a 100644
--- a/wally/suits/io/verify.cfg
+++ b/wally/suits/io/verify.cfg
@@ -2,20 +2,21 @@
include defaults.cfg
size={TEST_FILE_SIZE}
-ramp_time=5
-runtime=10
+ramp_time=90
+runtime=600
# ---------------------------------------------------------------------
[verify_{TEST_SUMM}]
blocksize=4k
rw=randwrite
direct=1
+# numjobs=5
# ---------------------------------------------------------------------
-# [verify_{TEST_SUMM}]
-# blocksize=4k
-# rw=randwrite
-# sync=1
+[verify_{TEST_SUMM}]
+blocksize=4k
+rw=randwrite
+sync=1
# ---------------------------------------------------------------------
# [verify_{TEST_SUMM}]
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 1cbf88c..7564722 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -1,104 +1,56 @@
import abc
+import time
+import logging
import os.path
import functools
+from concurrent.futures import ThreadPoolExecutor
+from wally.utils import Barrier, StopTestError
+from wally.statistic import data_property
from wally.ssh_utils import run_over_ssh, copy_paths
-def cached_prop(func):
- @property
- @functools.wraps(func)
- def closure(self):
- val = getattr(self, "_" + func.__name__)
- if val is NoData:
- val = func(self)
- setattr(self, "_" + func.__name__, val)
- return val
- return closure
+logger = logging.getLogger("wally")
-class NoData(object):
- pass
+class TestConfig(object):
+ """
+ this class describe test input configuration
-
-class VMThData(object):
- "store set of values for VM_COUNT * TH_COUNT"
-
-
-class IOTestResult(object):
- def __init__(self):
- self.run_config = None
- self.suite_config = None
- self.run_interval = None
-
- self.bw = None
- self.lat = None
- self.iops = None
- self.slat = None
- self.clat = None
-
- self.fio_section = None
-
- self._lat_log = NoData
- self._iops_log = NoData
- self._bw_log = NoData
-
- self._sensors_data = NoData
- self._raw_resuls = NoData
-
- def to_jsonable(self):
- pass
-
- @property
- def thread_count(self):
- pass
-
- @property
- def sync_mode(self):
- pass
-
- @property
- def abbrev_name(self):
- pass
-
- @property
- def full_name(self):
- pass
-
- @cached_prop
- def lat_log(self):
- pass
-
- @cached_prop
- def iops_log(self):
- pass
-
- @cached_prop
- def bw_log(self):
- pass
-
- @cached_prop
- def sensors_data(self):
- pass
-
- @cached_prop
- def raw_resuls(self):
- pass
+ test_type:str - test type name
+ params:{str:Any} - parameters from yaml file for this test
+ test_uuid:str - UUID to be used to create filenames and Co
+ log_directory:str - local directory to store results
+ nodes:[Node] - node to run tests on
+ remote_dir:str - directory on nodes to be used for local files
+ """
+ def __init__(self, test_type, params, test_uuid, nodes,
+ log_directory, remote_dir):
+ self.test_type = test_type
+ self.params = params
+ self.test_uuid = test_uuid
+ self.log_directory = log_directory
+ self.nodes = nodes
+ self.remote_dir = remote_dir
class TestResults(object):
- def __init__(self, config, params, results,
- raw_result, run_interval, vm_count,
- test_name, **attrs):
+ """
+ this class describe test results
+
+ config:TestConfig - test config object
+ params:dict - parameters from yaml file for this test
+ results:{str:MeasurementMesh} - test results object
+ raw_result:Any - opaque object to store raw results
+ run_interval:(float, float) - test tun time, used for sensors
+ """
+ def __init__(self, config, results, raw_result, run_interval):
self.config = config
- self.params = params
+ self.params = config.params
self.results = results
self.raw_result = raw_result
self.run_interval = run_interval
- self.vm_count = vm_count
- self.test_name = test_name
- self.__dict__.update(attrs)
def __str__(self):
res = "{0}({1}):\n results:\n".format(
@@ -124,100 +76,209 @@
pass
-class IPerfTest(object):
- def __init__(self, options, is_primary, on_result_cb, test_uuid, node,
- total_nodes_count,
- log_directory=None,
- coordination_queue=None,
- remote_dir="/tmp/wally"):
- self.options = options
- self.on_result_cb = on_result_cb
- self.log_directory = log_directory
- self.node = node
- self.test_uuid = test_uuid
- self.coordination_queue = coordination_queue
- self.remote_dir = remote_dir
- self.is_primary = is_primary
+class MeasurementMatrix(object):
+ """
+ data:[[MeasurementResult]] - VM_COUNT x TH_COUNT matrix of MeasurementResult
+ """
+ def __init__(self, data):
+ self.data = data
+
+ def per_vm(self):
+ return self.data
+
+ def per_th(self):
+ return sum(self.data, [])
+
+
+class MeasurementResults(object):
+ def stat(self):
+ return data_property(self.data)
+
+ def __str__(self):
+ return 'TS([' + ", ".join(map(str, self.data)) + '])'
+
+
+class SimpleVals(MeasurementResults):
+ """
+ data:[float] - list of values
+ """
+ def __init__(self, data):
+ self.data = data
+
+
+class TimeSeriesValue(MeasurementResults):
+ """
+ values:[(float, float, float)] - list of (start_time, lenght, average_value_for_interval)
+ """
+ def __init__(self, data):
+ assert len(data) > 0
+ data = [(0, 0)] + data
+
+ self.values = []
+ for (cstart, cval), (nstart, nval) in zip(data[:-1], data[1:]):
+ self.values.append((cstart, nstart - cstart, nval))
+
+ @property
+ def values(self):
+ return [val[2] for val in self.data]
+
+ def skip(self, seconds):
+ nres = []
+ for start, ln, val in enumerate(self.data):
+ if start + ln < seconds:
+ continue
+ elif start > seconds:
+ nres.append([start + ln - seconds, val])
+ else:
+ nres.append([0, val])
+ return self.__class__(nres)
+
+ def derived(self, tdelta):
+ end = tdelta
+ res = [[end, 0.0]]
+ tdelta = float(tdelta)
+
+ for start, lenght, val in self.data:
+ if start < end:
+ ln = min(end, start + lenght) - start
+ res[-1][1] += val * ln / tdelta
+
+ if end <= start + lenght:
+ end += tdelta
+ res.append([end, 0.0])
+ while end < start + lenght:
+ res[-1][1] = val
+ res.append([end, 0.0])
+ end += tdelta
+
+ if res[-1][1] < 1:
+ res = res[:-1]
+
+ return self.__class__(res)
+
+
+class PerfTest(object):
+ """
+ Very base class for tests
+ config:TestConfig - test configuration
+ stop_requested:bool - stop for test requested
+ """
+ def __init__(self, config):
+ self.config = config
self.stop_requested = False
- self.total_nodes_count = total_nodes_count
def request_stop(self):
self.stop_requested = True
def join_remote(self, path):
- return os.path.join(self.remote_dir, path)
-
- def coordinate(self, data):
- if self.coordination_queue is not None:
- self.coordination_queue.put((self.node.get_conn_id(), data))
-
- def pre_run(self):
- pass
-
- def cleanup(self):
- pass
+ return os.path.join(self.config.remote_dir, path)
@classmethod
@abc.abstractmethod
- def load(cls, data):
+ def load(cls, path):
pass
@abc.abstractmethod
- def run(self, barrier):
+ def run(self):
pass
- @classmethod
+ @abc.abstractmethod
def format_for_console(cls, data):
- msg = "{0}.format_for_console".format(cls.__name__)
- raise NotImplementedError(msg)
-
- def run_over_ssh(self, cmd, **kwargs):
- return run_over_ssh(self.node.connection, cmd,
- node=self.node.get_conn_id(), **kwargs)
-
- @classmethod
- def coordination_th(cls, coord_q, barrier, num_threads):
pass
-class TwoScriptTest(IPerfTest):
- def __init__(self, *dt, **mp):
- IPerfTest.__init__(self, *dt, **mp)
+def run_on_node(node):
+ def closure(*args, **kwargs):
+ return run_over_ssh(node.connection,
+ *args,
+ node=node.get_conn_id(),
+ **kwargs)
+ return closure
- if 'scripts_path' in self.options:
- self.root = self.options['scripts_path']
- self.run_script = self.options['run_script']
- self.prerun_script = self.options['prerun_script']
+
+class ThreadedTest(PerfTest):
+ """
+ Base class for tests, which spawn separated thread for each node
+ """
+
+ def run(self):
+ barrier = Barrier(len(self.nodes))
+ th_test_func = functools.partial(self.th_test_func, barrier)
+
+ with ThreadPoolExecutor(len(self.nodes)) as pool:
+ return list(pool.map(th_test_func, self.config.nodes))
+
+ @abc.abstractmethod
+ def do_test(self, node):
+ pass
+
+ def th_test_func(self, barrier, node):
+ logger.debug("Starting {0} test on {1} node".format(self.__class__.__name__,
+ node.conn_url))
+
+ logger.debug("Run preparation for {0}".format(node.get_conn_id()))
+ self.pre_run(node)
+ barrier.wait()
+ try:
+ logger.debug("Run test for {0}".format(node.get_conn_id()))
+ return self.do_test(node)
+ except StopTestError as exc:
+ pass
+ except Exception as exc:
+ msg = "In test {0} for node {1}".format(self, node.get_conn_id())
+ logger.exception(msg)
+ exc = StopTestError(msg, exc)
+
+ try:
+ self.cleanup()
+ except StopTestError as exc1:
+ if exc is None:
+ exc = exc1
+ except Exception as exc1:
+ if exc is None:
+ msg = "Duringf cleanup - in test {0} for node {1}".format(self, node)
+ logger.exception(msg)
+ exc = StopTestError(msg, exc)
+
+ if exc is not None:
+ raise exc
+
+ def pre_run(self, node):
+ pass
+
+ def cleanup(self, node):
+ pass
+
+
+class TwoScriptTest(ThreadedTest):
+ def __init__(self, *dt, **mp):
+ ThreadedTest.__init__(self, *dt, **mp)
+
+ self.prerun_script = self.config.params['prerun_script']
+ self.run_script = self.config.params['run_script']
+
+ self.prerun_tout = self.config.params.get('prerun_tout', 3600)
+ self.run_tout = self.config.params.get('run_tout', 3600)
def get_remote_for_script(self, script):
- return os.path.join(self.remote_dir, script.rpartition('/')[2])
+ return os.path.join(self.options.remote_dir,
+ os.path.basename(script))
- def pre_run(self):
- copy_paths(self.node.connection, {self.root: self.remote_dir})
+ def pre_run(self, node):
+ copy_paths(node.connection,
+ {
+ self.run_script: self.get_remote_for_script(self.run_script),
+ self.prerun_script: self.get_remote_for_script(self.prerun_script),
+ })
+
cmd = self.get_remote_for_script(self.pre_run_script)
- self.run_over_ssh(cmd, timeout=2000)
+ cmd += ' ' + self.config.params.get('prerun_opts', '')
+ run_on_node(node)(cmd, timeout=self.prerun_tout)
- def run(self, barrier):
- remote_script = self.get_remote_for_script(self.run_script)
- cmd_opts = ' '.join(["%s %s" % (key, val) for key, val
- in self.options.items()])
- cmd = remote_script + ' ' + cmd_opts
- out_err = self.run_over_ssh(cmd, timeout=6000)
- self.on_result(out_err, cmd)
-
- def parse_results(self, out):
- for line in out.split("\n"):
- key, separator, value = line.partition(":")
- if key and value:
- self.on_result_cb((key, float(value)))
-
- def on_result(self, out_err, cmd):
- try:
- self.parse_results(out_err)
- except Exception as exc:
- msg_templ = "Error during postprocessing results: {0!s}. {1}"
- raise RuntimeError(msg_templ.format(exc, out_err))
-
- def merge_results(self, results):
- tpcm = sum([val[1] for val in results])
- return {"res": {"TpmC": tpcm}}
+ def run(self, node):
+ cmd = self.get_remote_for_script(self.run_script)
+ cmd += ' ' + self.config.params.get('run_opts', '')
+ t1 = time.time()
+ res = run_on_node(node)(cmd, timeout=self.run_tout)
+ t2 = time.time()
+ return TestResults(self.config, None, res, (t1, t2))