add lat/bw limits, fio binaryes, fix bugs, fix latency calculations, etc
diff --git a/wally/hw_info.py b/wally/hw_info.py
index 5819eed..73226e3 100644
--- a/wally/hw_info.py
+++ b/wally/hw_info.py
@@ -20,11 +20,14 @@
# real disks on raid controller
self.disks_raw_info = {}
+ # name => (speed, is_full_diplex, ip_addresses)
self.net_info = {}
+
self.ram_size = 0
self.sys_name = None
self.mb = None
self.raw = None
+
self.storage_controllers = []
def get_HDD_count(self):
@@ -89,7 +92,7 @@
if self.net_info != {}:
res.append("Net adapters:")
- for name, (speed, dtype) in self.net_info.items():
+ for name, (speed, dtype, _) in self.net_info.items():
res.append(" {0} {2} duplex={1}".format(name, dtype, speed))
else:
res.append("Net adapters: Failed to get net info")
@@ -111,6 +114,7 @@
def get_sw_info(conn):
res = SWInfo()
+ res.OS_version = utils.get_os()
with conn.open_sftp() as sftp:
def get(fname):
@@ -121,7 +125,6 @@
res.kernel_version = get('/proc/version')
res.partitions = get('/etc/mtab')
- res.OS_version = get('/etc/lsb-release')
def rr(cmd):
try:
@@ -136,6 +139,10 @@
return res
+def get_network_info():
+ pass
+
+
def get_hw_info(conn):
res = HWInfo()
lshw_out = ssh_utils.run_over_ssh(conn, 'sudo lshw -xml 2>/dev/null',
@@ -215,7 +222,7 @@
else:
dup = dup_node.attrib['value']
- res.net_info[name] = (speed, dup)
+ res.net_info[name] = (speed, dup, [])
except:
pass
diff --git a/wally/start_vms.py b/wally/start_vms.py
index c533162..471996a 100644
--- a/wally/start_vms.py
+++ b/wally/start_vms.py
@@ -75,6 +75,7 @@
serv_groups = " ".join(map(params['aa_group_name'].format,
range(MAX_VM_PER_NODE)))
+ image_name = params['image']['name']
env = os.environ.copy()
env.update(dict(
OS_USERNAME=name,
@@ -92,7 +93,7 @@
SECGROUP=params['security_group'],
- IMAGE_NAME=params['image']['name'],
+ IMAGE_NAME=image_name,
KEY_FILE_NAME=params['keypair_file_private'],
IMAGE_URL=params['image']['url'],
))
@@ -105,11 +106,11 @@
conn = nova_connect(name, passwd, tenant, auth_url)
while True:
- status = conn.images.find(name='wally_ubuntu').status
+ status = conn.images.find(name=image_name).status
if status == 'ACTIVE':
break
msg = "Image {0} is still in {1} state. Waiting 10 more seconds"
- logger.info(msg.format('wally_ubuntu', status))
+ logger.info(msg.format(image_name, status))
time.sleep(10)
diff --git a/wally/suits/io/ceph.cfg b/wally/suits/io/ceph.cfg
index fc96a52..bf410bb 100644
--- a/wally/suits/io/ceph.cfg
+++ b/wally/suits/io/ceph.cfg
@@ -2,7 +2,8 @@
include defaults.cfg
NUMJOBS={% 1, 5, 10, 15, 40 %}
-NUMJOBS_SHORT={% 1, 2, 3, 10 %}
+# NUMJOBS_SHORT={% 1, 2, 3, 10 %}
+NUMJOBS_SHORT=1
ramp_time=15
runtime=120
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
index 2a76fc2..c64523d 100644
--- a/wally/suits/io/fio.py
+++ b/wally/suits/io/fio.py
@@ -1,6 +1,7 @@
import re
import time
import json
+import random
import os.path
import logging
import datetime
@@ -14,6 +15,7 @@
from paramiko.ssh_exception import SSHException
from concurrent.futures import ThreadPoolExecutor
+import wally
from wally.pretty_yaml import dumps
from wally.statistic import round_3_digit, data_property, average
from wally.utils import ssize2b, sec_to_str, StopTestError, Barrier, get_os
@@ -96,16 +98,16 @@
mm_res[key] = MeasurementMatrix(matr, conn_ids)
- iops_from_lat_matr = []
- for node_ts in mm_res['lat'].data:
- iops_from_lat_matr.append([])
- for thread_ts in node_ts:
- ndt = [(start + ln, 1000000. / val)
- for (start, ln, val) in thread_ts.data]
- new_ts = TimeSeriesValue(ndt)
- iops_from_lat_matr[-1].append(new_ts)
+ # iops_from_lat_matr = []
+ # for node_ts in mm_res['lat'].data:
+ # iops_from_lat_matr.append([])
+ # for thread_ts in node_ts:
+ # ndt = [(start + ln, 1000000. / val)
+ # for (start, ln, val) in thread_ts.data]
+ # new_ts = TimeSeriesValue(ndt)
+ # iops_from_lat_matr[-1].append(new_ts)
- mm_res['iops_from_lat'] = MeasurementMatrix(iops_from_lat_matr, conn_ids)
+ # mm_res['iops_from_lat'] = MeasurementMatrix(iops_from_lat_matr, conn_ids)
raw_res = {}
for conn_id in conn_ids:
@@ -197,7 +199,7 @@
self.bw = ts_results.get('bw')
self.lat = ts_results.get('lat')
self.iops = ts_results.get('iops')
- self.iops_from_lat = ts_results.get('iops_from_lat')
+ # self.iops_from_lat = ts_results.get('iops_from_lat')
# self.slat = drop_warmup(res.get('clat', None), self.params)
# self.clat = drop_warmup(res.get('slat', None), self.params)
@@ -220,13 +222,10 @@
total_bytes = [self.raw_result[node]['jobs'][0]['mixed']['io_bytes'] for node in nodes]
flt_bw = [float(tbytes) / rtime for tbytes, rtime in zip(total_bytes, runtime)]
- lat = [self.raw_result[node]['jobs'][0]['mixed']['lat'] for node in nodes]
-
return {'iops': iops,
'flt_iops': flt_iops,
'bw': bw,
- 'flt_bw': flt_bw,
- 'lat': lat}
+ 'flt_bw': flt_bw}
def summary(self):
return get_test_summary(self.fio_task) + "vm" \
@@ -235,15 +234,11 @@
def get_yamable(self):
return self.summary()
- def disk_perf_info(self, avg_interval=5.0):
-
- if self._pinfo is not None:
- return self._pinfo
-
+ def get_lat_perc_50_95_multy(self):
lat_mks = collections.defaultdict(lambda: 0)
num_res = 0
- for _, result in self.raw_result.items():
+ for result in self.raw_result.values():
num_res += len(result['jobs'])
for job_info in result['jobs']:
for k, v in job_info['latency_ms'].items():
@@ -257,6 +252,12 @@
for k, v in lat_mks.items():
lat_mks[k] = float(v) / num_res
+ return get_lat_perc_50_95(lat_mks)
+
+ def disk_perf_info(self, avg_interval=2.0):
+
+ if self._pinfo is not None:
+ return self._pinfo
testnodes_count = len(self.config.nodes)
@@ -267,7 +268,7 @@
# ramp_time = self.fio_task.vals.get('ramp_time', 0)
- def prepare(data):
+ def prepare(data, drop=1):
if data is None:
return data
@@ -279,7 +280,15 @@
if ts_data.average_interval() < avg_interval:
ts_data = ts_data.derived(avg_interval)
- res.append(ts_data.values)
+ # drop last value on bounds
+ # as they may contains ranges without activities
+ assert len(ts_data.values) >= drop + 1
+
+ if drop > 0:
+ res.append(ts_data.values[:-drop])
+ else:
+ res.append(ts_data.values)
+
return res
def agg_data(matr):
@@ -290,11 +299,12 @@
res.append(sum(dt[idx] for dt in arr))
return res
- pinfo.raw_lat = map(prepare, self.lat.per_vm())
- num_th = sum(map(len, pinfo.raw_lat))
- avg_lat = [val / num_th for val in agg_data(pinfo.raw_lat)]
- pinfo.lat = data_property(avg_lat)
- pinfo.lat_50, pinfo.lat_95 = get_lat_perc_50_95(lat_mks)
+ # pinfo.raw_lat = map(prepare, self.lat.per_vm())
+ # num_th = sum(map(len, pinfo.raw_lat))
+ # avg_lat = [val / num_th for val in agg_data(pinfo.raw_lat)]
+ # pinfo.lat = data_property(avg_lat)
+ pinfo.lat_50, pinfo.lat_95 = self.get_lat_perc_50_95_multy()
+ pinfo.lat = pinfo.lat_50
pinfo.raw_bw = map(prepare, self.bw.per_vm())
pinfo.raw_iops = map(prepare, self.iops.per_vm())
@@ -360,6 +370,10 @@
self.alive_check_interval = get('alive_check_interval')
self.use_system_fio = get('use_system_fio', False)
+ if get('prefill_files') is not None:
+ logger.warning("prefill_files option is depricated. Use force_prefill instead")
+
+ self.force_prefill = get('force_prefill', False)
self.config_params = get('params', {}).copy()
self.io_py_remote = self.join_remote("agent.py")
@@ -371,7 +385,7 @@
self.exit_code_file = self.join_remote("exit_code")
self.max_latency = get("max_lat", None)
- self.min_bw_per_thread = get("max_bw", None)
+ self.min_bw_per_thread = get("min_bw", None)
self.use_sudo = get("use_sudo", True)
self.test_logging = get("test_logging", False)
@@ -395,40 +409,67 @@
# Need to remove tempo files, used for testing
pass
- def prefill_test_files(self, files, rossh):
- cmd_templ = "fio --name=xxx --filename={0} --direct=1" + \
- " --bs=4m --size={1}m --rw=write"
+ def check_prefill_required(self, rossh, fname, size, num_blocks=16):
+ cmd = """python -c "import sys; fd = open('{0}', 'rb');""" + \
+ """fd.seek({1}); sys.stdout.write(fc.read(1024))" | md5sum"""
+
+ if self.use_sudo:
+ cmd = "sudo " + cmd
+
+ zero_md5 = '54ac58cc1e2711a1a3d88bce15bb152d'
+
+ for _ in range(num_blocks):
+ offset = random.randrange(size * 1024)
+ data = rossh(cmd.format(fname, offset), nolog=True)
+ if zero_md5 == data.split()[0].strip():
+ return True
+ return False
+
+ def prefill_test_files(self, rossh, files, force=False):
+ if self.use_system_fio:
+ cmd_templ = "fio "
+ else:
+ cmd_templ = "{0}/fio ".format(self.config.remote_dir)
if self.use_sudo:
cmd_templ = "sudo " + cmd_templ
- ssize = 0
- stime = time.time()
+ cmd_templ += "--name=xxx --filename={0} --direct=1" + \
+ " --bs=4m --size={1}m --rw=write"
+ ssize = 0
+
+ if force:
+ logger.info("File prefilling is forced")
+
+ ddtime = 0
for fname, curr_sz in files.items():
+ if not force:
+ if not self.check_prefill_required(rossh, fname, curr_sz):
+ continue
+
+ logger.info("Prefilling file {0}".format(fname))
cmd = cmd_templ.format(fname, curr_sz)
ssize += curr_sz
+ stime = time.time()
rossh(cmd, timeout=curr_sz)
+ ddtime += time.time() - stime
- ddtime = time.time() - stime
- if ddtime > 1E-3:
+ if ddtime > 1.0:
fill_bw = int(ssize / ddtime)
- mess = "Initiall dd fill bw is {0} MiBps for this vm"
+ mess = "Initiall fio fill bw is {0} MiBps for this vm"
logger.info(mess.format(fill_bw))
- return fill_bw
- def install_utils(self, rossh, max_retry=3, timeout=5):
+ def install_utils(self, node, rossh, max_retry=3, timeout=5):
need_install = []
packs = [('screen', 'screen')]
+ os_info = get_os(rossh)
if self.use_system_fio:
packs.append(('fio', 'fio'))
else:
- # define OS and x32/x64
- # copy appropriate fio
- # add fio deps
- pass
+ packs.append(('bzip2', 'bzip2'))
for bin_name, package in packs:
if bin_name is None:
@@ -440,50 +481,60 @@
except OSError:
need_install.append(package)
- if len(need_install) == 0:
- return
+ if len(need_install) != 0:
+ if 'redhat' == os_info.distro:
+ cmd = "sudo yum -y install " + " ".join(need_install)
+ else:
+ cmd = "sudo apt-get -y install " + " ".join(need_install)
- if 'redhat' == get_os(rossh):
- cmd = "sudo yum -y install " + " ".join(need_install)
- else:
- cmd = "sudo apt-get -y install " + " ".join(need_install)
+ for _ in range(max_retry):
+ try:
+ rossh(cmd)
+ break
+ except OSError as err:
+ time.sleep(timeout)
+ else:
+ raise OSError("Can't install - " + str(err))
- for _ in range(max_retry):
- try:
- rossh(cmd)
- break
- except OSError as err:
- time.sleep(timeout)
- else:
- raise OSError("Can't install - " + str(err))
+ if not self.use_system_fio:
+ fio_dir = os.path.dirname(os.path.dirname(wally.__file__))
+ fio_dir = os.path.join(os.getcwd(), fio_dir)
+ fio_dir = os.path.join(fio_dir, 'fio_binaries')
+ fname = 'fio_{0.release}_{0.arch}.bz2'.format(os_info)
+ fio_path = os.path.join(fio_dir, fname)
+
+ if not os.path.exists(fio_path):
+ raise RuntimeError("No prebuild fio available for {0}".format(os_info))
+
+ bz_dest = self.join_remote('fio.bz2')
+ with node.connection.open_sftp() as sftp:
+ sftp.put(fio_path, bz_dest)
+
+ rossh("bzip2 --decompress " + bz_dest, nolog=True)
+ rossh("chmod a+x " + self.join_remote("fio"), nolog=True)
def pre_run(self):
- prefill = self.config.options.get('prefill_files', True)
+ files = {}
+ for section in self.fio_configs:
+ sz = ssize2b(section.vals['size'])
+ msz = sz / (1024 ** 2)
- if prefill:
- files = {}
- for cfg_slice in self.fio_configs:
- for section in cfg_slice:
- sz = ssize2b(section.vals['size'])
- msz = sz / (1024 ** 2)
+ if sz % (1024 ** 2) != 0:
+ msz += 1
- if sz % (1024 ** 2) != 0:
- msz += 1
+ fname = section.vals['filename']
- fname = section.vals['filename']
-
- # if already has other test with the same file name
- # take largest size
- files[fname] = max(files.get(fname, 0), msz)
- else:
- files = None
- logger.warning("Prefilling of test files is disabled")
+ # if already has other test with the same file name
+ # take largest size
+ files[fname] = max(files.get(fname, 0), msz)
with ThreadPoolExecutor(len(self.config.nodes)) as pool:
- fc = functools.partial(self.pre_run_th, files=files)
+ fc = functools.partial(self.pre_run_th,
+ files=files,
+ force=self.force_prefill)
list(pool.map(fc, self.config.nodes))
- def pre_run_th(self, node, files):
+ def pre_run_th(self, node, files, force):
# fill files with pseudo-random data
rossh = run_on_node(node)
@@ -491,22 +542,23 @@
cmd = 'mkdir -p "{0}"'.format(self.config.remote_dir)
if self.use_sudo:
cmd = "sudo " + cmd
- cmd += " ; sudo chown {0} {1}".format(self.node.get_user(),
+ cmd += " ; sudo chown {0} {1}".format(node.get_user(),
self.config.remote_dir)
+ rossh(cmd, nolog=True)
- rossh(cmd)
+ assert self.config.remote_dir != "" and self.config.remote_dir != "/"
+ rossh("rm -rf {0}/*".format(self.config.remote_dir), nolog=True)
+
except Exception as exc:
msg = "Failed to create folder {0} on remote {1}. Error: {2!s}"
- msg = msg.format(self.config.remote_dir, self.node.get_conn_id(), exc)
+ msg = msg.format(self.config.remote_dir, node.get_conn_id(), exc)
logger.exception(msg)
raise StopTestError(msg, exc)
- if files is not None:
- self.prefill_test_files(rossh, files)
+ self.install_utils(node, rossh)
+ self.prefill_test_files(rossh, files, force)
- self.install_utils(rossh)
-
- def run(self):
+ def show_test_execution_time(self):
if len(self.fio_configs) > 1:
# +10% - is a rough estimation for additional operations
# like sftp, etc
@@ -518,6 +570,11 @@
logger.info(msg.format(exec_time_s,
end_dt.strftime("%H:%M:%S")))
+ def run(self):
+ logger.debug("Run preparation")
+ self.pre_run()
+ self.show_test_execution_time()
+
tname = os.path.basename(self.config_fname)
if tname.endswith('.cfg'):
tname = tname[:-4]
@@ -525,8 +582,8 @@
barrier = Barrier(len(self.config.nodes))
results = []
- # set of OperationModeBlockSize str
- # which should not ne tested anymore, as
+ # set of Operation_Mode_BlockSize str's
+ # which should not be tested anymore, as
# they already too slow with previous thread count
lat_bw_limit_reached = set()
@@ -536,7 +593,6 @@
for pos, fio_cfg in enumerate(self.fio_configs):
test_descr = get_test_summary(fio_cfg.vals).split("th")[0]
if test_descr in lat_bw_limit_reached:
- logger.info("Will skip {0} test due to lat/bw limits".format(fio_cfg.name))
continue
else:
logger.info("Will run {0} test".format(fio_cfg.name))
@@ -591,11 +647,16 @@
res = load_test_results(IOTestResult, self.config.log_directory, pos)
results.append(res)
- test_res = res.get_params_from_fio_report()
if self.max_latency is not None:
- if self.max_latency < average(test_res['lat']):
+ lat_50, _ = res.get_lat_perc_50_95_multy()
+
+ # conver us to ms
+ if self.max_latency < lat_50:
+ logger.info(("Will skip all subsequent tests of {0} " +
+ "due to lat/bw limits").format(fio_cfg.name))
lat_bw_limit_reached.add(test_descr)
+ test_res = res.get_params_from_fio_report()
if self.min_bw_per_thread is not None:
if self.min_bw_per_thread > average(test_res['bw']):
lat_bw_limit_reached.add(test_descr)
@@ -603,24 +664,34 @@
return results
def do_run(self, node, barrier, fio_cfg, pos, nolog=False):
- exec_folder = os.path.dirname(self.task_file)
+ if self.use_sudo:
+ sudo = "sudo "
+ else:
+ sudo = ""
+
bash_file = "#!/bin/bash\n" + \
"cd {exec_folder}\n" + \
- "fio --output-format=json --output={out_file} " + \
+ "{fio_path}fio --output-format=json --output={out_file} " + \
"--alloc-size=262144 {job_file} " + \
" >{err_out_file} 2>&1 \n" + \
"echo $? >{res_code_file}\n"
+ exec_folder = self.config.remote_dir
+
+ if self.use_system_fio:
+ fio_path = ""
+ else:
+ if not exec_folder.endswith("/"):
+ fio_path = exec_folder + "/"
+ else:
+ fio_path = exec_folder
+
bash_file = bash_file.format(out_file=self.results_file,
job_file=self.task_file,
err_out_file=self.err_out_file,
res_code_file=self.exit_code_file,
- exec_folder=exec_folder)
-
- run_on_node(node)("mkdir -p {0}".format(exec_folder), nolog=True)
-
- assert exec_folder != "" and exec_folder != "/"
- run_on_node(node)("rm -rf {0}/*".format(exec_folder), nolog=True)
+ exec_folder=exec_folder,
+ fio_path=fio_path)
with node.connection.open_sftp() as sftp:
save_to_remote(sftp, self.task_file, str(fio_cfg))
@@ -633,11 +704,6 @@
begin = time.time()
- if self.use_sudo:
- sudo = "sudo "
- else:
- sudo = ""
-
fnames_before = run_on_node(node)("ls -1 " + exec_folder, nolog=True)
barrier.wait()
@@ -685,6 +751,7 @@
files = collections.defaultdict(lambda: [])
all_files = [os.path.basename(self.results_file)]
new_files = set(fnames_after.split()) - set(fnames_before.split())
+
for fname in new_files:
if fname.endswith('.log') and fname.split('.')[0] in log_files_pref:
name, _ = os.path.splitext(fname)
@@ -719,12 +786,12 @@
raise StopTestError("fio failed")
rossh("rm -f {0}".format(arch_name), nolog=True)
- cmd = "cd {0} ; tar zcvf {1} {2}".format(exec_folder, arch_name, file_full_names)
- rossh(cmd, nolog=True)
+ pack_files_cmd = "cd {0} ; tar zcvf {1} {2}".format(exec_folder, arch_name, file_full_names)
+ rossh(pack_files_cmd, nolog=True)
sftp.get(arch_name, loc_arch_name)
- cmd = "cd {0} ; tar xvzf {1} >/dev/null".format(tmp_dir, loc_arch_name)
- subprocess.check_call(cmd, shell=True)
+ unpack_files_cmd = "cd {0} ; tar xvzf {1} >/dev/null".format(tmp_dir, loc_arch_name)
+ subprocess.check_call(unpack_files_cmd, shell=True)
os.unlink(loc_arch_name)
for ftype, fls in files.items():
@@ -739,8 +806,12 @@
loc_fname = "{0}_{1}_rawres.json".format(pos, conn_id)
loc_path = os.path.join(self.config.log_directory, loc_fname)
os.rename(cname, loc_path)
-
os.rmdir(tmp_dir)
+
+ remove_remote_res_files_cmd = "cd {0} ; rm -f {1} {2}".format(exec_folder,
+ arch_name,
+ file_full_names)
+ rossh(remove_remote_res_files_cmd, nolog=True)
return begin, end
@classmethod
@@ -772,7 +843,7 @@
tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
header = ["Name", "Description", "iops\ncum", "KiBps\ncum",
- "Cnf\n95%", "Dev%", "iops\nper vm", "KiBps\nper vm", "lat\nms"]
+ "Cnf\n95%", "Dev%", "iops\nper vm", "KiBps\nper vm", "lat ms\nmedian"]
tab.set_cols_align(["l", "l"] + ['r'] * (len(header) - 2))
sep = ["-------", "-----------"] + ["---"] * (len(header) - 2)
tab.header(header)
@@ -795,8 +866,7 @@
conf_perc = int(round(bw_conf * 100 / bw))
dev_perc = int(round(bw_dev * 100 / bw))
- lat, _ = test_dinfo.lat.rounded_average_conf()
- lat = round_3_digit(int(lat) // 1000)
+ lat = round_3_digit(int(test_dinfo.lat))
testnodes_count = len(item.config.nodes)
iops_per_vm = round_3_digit(iops / testnodes_count)
diff --git a/wally/utils.py b/wally/utils.py
index 1362000..ec4b8c0 100644
--- a/wally/utils.py
+++ b/wally/utils.py
@@ -6,6 +6,7 @@
import threading
import contextlib
import subprocess
+import collections
try:
import psutil
@@ -318,16 +319,31 @@
return user, passwd, tenant, auth_url
+os_release = collections.namedtuple("Distro", ["distro", "release", "arch"])
+
+
def get_os(run_func):
+ arch = run_func("arch", nolog=True).strip()
+
try:
- run_func("ls -l /etc/redhat-release")
- return 'redhat'
+ run_func("ls -l /etc/redhat-release", nolog=True)
+ return os_release('redhat', None, arch)
except:
pass
try:
- run_func("ls -l /etc/debian-release")
- return 'ubuntu'
+ run_func("ls -l /etc/debian_version", nolog=True)
+
+ release = None
+ for line in run_func("lsb_release -a", nolog=True).split("\n"):
+ if ':' not in line:
+ continue
+ opt, val = line.split(":", 1)
+
+ if opt == 'Codename':
+ release = val.strip()
+
+ return os_release('ubuntu', release, arch)
except:
pass