some fixes
diff --git a/wally/ssh_utils.py b/wally/ssh_utils.py
index 0f46b83..e566cbe 100644
--- a/wally/ssh_utils.py
+++ b/wally/ssh_utils.py
@@ -455,7 +455,7 @@
while self.check_running() and time.time() < soft_end_of_wait_time:
# time.sleep(soft_end_of_wait_time - time.time())
- time.sleep(2)
+ time.sleep(time_till_check)
while end_of_wait_time > time.time():
time.sleep(time_till_check)
diff --git a/wally/start_vms.py b/wally/start_vms.py
index b5eb17e..d9bde0c 100644
--- a/wally/start_vms.py
+++ b/wally/start_vms.py
@@ -2,6 +2,7 @@
import os
import stat
import time
+import urllib
import os.path
import logging
import warnings
@@ -14,6 +15,19 @@
from novaclient.client import Client as n_client
from cinderclient.v1.client import Client as c_client
+__doc__ = """
+Module used to reliably spawn set of VM's, evenly distributed across
+openstack cluster. Main functions:
+
+ get_OS_credentials - extract openstack credentials from different sources
+ nova_connect - connect to nova api
+ cinder_connect - connect to cinder api
+ find - find VM with given prefix in name
+ prepare_OS - prepare tenant for usage
+ launch_vms - reliably start set of VM in parallel with volumes and floating IP
+ clear_all - clear VM and volumes
+"""
+
import wally
from wally.discover import Node
@@ -82,13 +96,12 @@
return CINDER_CONNECTION
-def prepare_os_subpr(nova, params, os_creds):
+def prepare_os_subpr(nova, params, os_creds, max_vm_per_compute=8):
if os_creds is None:
os_creds = ostack_get_creds()
- MAX_VM_PER_NODE = 8
serv_groups = " ".join(map(params['aa_group_name'].format,
- range(MAX_VM_PER_NODE)))
+ range(max_vm_per_compute)))
image_name = params['image']['name']
env = os.environ.copy()
@@ -111,9 +124,6 @@
IMAGE_NAME=image_name,
IMAGE_URL=params['image']['url'],
-
- # KEYPAIR_NAME=params['keypair_name'],
- # KEY_FILE_NAME=params['keypair_file_private'],
))
spath = os.path.dirname(os.path.dirname(wally.__file__))
@@ -193,21 +203,20 @@
future.result()
-def prepare_os(nova, params):
+def prepare_os(nova, params, max_vm_per_compute=8):
allow_ssh(nova, params['security_group'])
MAX_VM_PER_NODE = 8
- serv_groups = " ".join(map(params['aa_group_name'].format,
- range(MAX_VM_PER_NODE)))
+ serv_groups = map(params['aa_group_name'].format,
+ range(MAX_VM_PER_NODE))
- shed_ids = []
- for shed_group in serv_groups:
- shed_ids.append(get_or_create_aa_group(nova, shed_group))
+ for serv_groups in serv_groups:
+ get_or_create_aa_group(nova, serv_groups)
create_keypair(nova,
params['keypair_name'],
- params['keypair_name'] + ".pub",
- params['keypair_name'] + ".pem")
+ params['keypair_file_public'],
+ params['keypair_file_private'])
create_image(nova, params['image']['name'],
params['image']['url'])
@@ -216,6 +225,21 @@
def create_keypair(nova, name, pub_key_path, priv_key_path):
+ """create and upload keypair into nova, if doesn't exists yet
+
+ Create and upload keypair into nova, if keypair with given bane
+ doesn't exists yet. Uses key from files, if file doesn't exists -
+ create new keys, and store'em into files.
+
+ parameters:
+ nova: nova connection
+ name: str - ketpair name
+ pub_key_path: str - path for public key
+ priv_key_path: str - path for private key
+
+ returns: None
+ """
+
pub_key_exists = os.path.exists(pub_key_path)
priv_key_exists = os.path.exists(priv_key_path)
@@ -252,6 +276,14 @@
def get_or_create_aa_group(nova, name):
+ """create anti-affinity server group, if doesn't exists yet
+
+ parameters:
+ nova: nova connection
+ name: str - group name
+
+ returns: str - group id
+ """
try:
group = nova.server_groups.find(name=name)
except NotFound:
@@ -262,6 +294,14 @@
def allow_ssh(nova, group_name):
+ """create sequrity group for ping and ssh
+
+ parameters:
+ nova: nova connection
+ group_name: str - group name
+
+ returns: str - group id
+ """
try:
secgroup = nova.security_groups.find(name=group_name)
except NotFound:
@@ -282,12 +322,64 @@
return secgroup.id
-def create_image(nova, name, url):
- pass
+def create_image(nova, os_creds, name, url):
+ """upload image into glance from given URL, if given image doesn't exisis yet
+
+ parameters:
+ nova: nova connection
+ os_creds: OSCreds object - openstack credentials, should be same,
+ as used when connectiong given novaclient
+ name: str - image name
+ url: str - image download url
+
+ returns: None
+ """
+ try:
+ nova.images.find(name=name)
+ return
+ except NotFound:
+ pass
+
+ tempnam = os.tempnam()
+
+ try:
+ urllib.urlretrieve(url, tempnam)
+
+ cmd = "OS_USERNAME={0.name}"
+ cmd += " OS_PASSWORD={0.passwd}"
+ cmd += " OS_TENANT_NAME={0.tenant}"
+ cmd += " OS_AUTH_URL={0.auth_url}"
+ cmd += " glance {1} image-create --name {2} $opts --file {3}"
+ cmd += " --disk-format qcow2 --container-format bare --is-public true"
+
+ cmd = cmd.format(os_creds,
+ '--insecure' if os_creds.insecure else "",
+ name,
+ tempnam)
+ finally:
+ if os.path.exists(tempnam):
+ os.unlink(tempnam)
def create_flavor(nova, name, ram_size, hdd_size, cpu_count):
- pass
+ """create flavor, if doesn't exisis yet
+
+ parameters:
+ nova: nova connection
+ name: str - flavor name
+ ram_size: int - ram size (UNIT?)
+ hdd_size: int - root hdd size (UNIT?)
+ cpu_count: int - cpu cores
+
+ returns: None
+ """
+ try:
+ nova.flavors.find(name)
+ return
+ except NotFound:
+ pass
+
+ nova.flavors.create(name, cpu_count, ram_size, hdd_size)
def create_volume(size, name):
@@ -312,6 +404,16 @@
def wait_for_server_active(nova, server, timeout=300):
+ """waiting till server became active
+
+ parameters:
+ nova: nova connection
+ server: server object
+ timeout: int - seconds to wait till raise an exception
+
+ returns: None
+ """
+
t = time.time()
while True:
time.sleep(1)
@@ -334,6 +436,15 @@
def get_floating_ips(nova, pool, amount):
+ """allocate flationg ips
+
+ parameters:
+ nova: nova connection
+ pool:str floating ip pool name
+ amount:int - ip count
+
+ returns: [ip object]
+ """
ip_list = nova.floating_ips.list()
if pool is not None:
@@ -385,10 +496,8 @@
# precache all errors before start creating vms
private_key_path = params['keypair_file_private']
creds = params['image']['creds']
- creds.format(ip="1.1.1.1", private_key_path="/some_path/xx")
for ip, os_node in create_vms_mt(NOVA_CONNECTION, count, **vm_params):
-
conn_uri = creds.format(ip=ip, private_key_path=private_key_path)
yield Node(conn_uri, []), os_node.id
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
index 1a3e846..e86b5cd 100644
--- a/wally/suits/io/fio.py
+++ b/wally/suits/io/fio.py
@@ -15,7 +15,7 @@
import paramiko
import texttable
from paramiko.ssh_exception import SSHException
-from concurrent.futures import ThreadPoolExecutor
+from concurrent.futures import ThreadPoolExecutor, wait
import wally
from wally.pretty_yaml import dumps
@@ -68,6 +68,27 @@
return TimeSeriesValue(vals)
+READ_IOPS_DISCSTAT_POS = 3
+WRITE_IOPS_DISCSTAT_POS = 7
+
+
+def load_sys_log_file(ftype, fname):
+ assert ftype == 'iops'
+ pval = None
+ with open(fname) as fd:
+ iops = []
+ for ln in fd:
+ params = ln.split()
+ cval = int(params[WRITE_IOPS_DISCSTAT_POS]) + \
+ int(params[READ_IOPS_DISCSTAT_POS])
+ if pval is not None:
+ iops.append(cval - pval)
+ pval = cval
+
+ vals = [(idx * 1000, val) for idx, val in enumerate(iops)]
+ return TimeSeriesValue(vals)
+
+
def load_test_results(folder, run_num):
res = {}
params = None
@@ -94,6 +115,24 @@
conn_ids_set.add(conn_id)
+ rr = r"{0}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.sys\.log$".format(run_num)
+ for fname in os.listdir(folder):
+ rm = re.match(rr, fname)
+ if rm is None:
+ continue
+
+ conn_id_s = rm.group('conn_id')
+ conn_id = conn_id_s.replace('_', ':')
+ ftype = rm.group('type')
+
+ if ftype not in ('iops', 'bw', 'lat'):
+ continue
+
+ ts = load_sys_log_file(ftype, os.path.join(folder, fname))
+ res.setdefault(ftype + ":sys", {}).setdefault(conn_id, []).append(ts)
+
+ conn_ids_set.add(conn_id)
+
mm_res = {}
if len(res) == 0:
@@ -217,11 +256,19 @@
self.fio_task = fio_task
self.idx = idx
- self.bw = ts_results.get('bw')
- self.lat = ts_results.get('lat')
- self.iops = ts_results.get('iops')
+ self.bw = ts_results['bw']
+ self.lat = ts_results['lat']
+ self.iops = ts_results['iops']
- res = {"bw": self.bw, "lat": self.lat, "iops": self.iops}
+ if 'iops:sys' in ts_results:
+ self.iops_sys = ts_results['iops:sys']
+ else:
+ self.iops_sys = None
+
+ res = {"bw": self.bw,
+ "lat": self.lat,
+ "iops": self.iops,
+ "iops:sys": self.iops_sys}
self.sensors_data = None
self._pinfo = None
@@ -321,6 +368,13 @@
pinfo.raw_bw = map(prepare, self.bw.per_vm())
pinfo.raw_iops = map(prepare, self.iops.per_vm())
+ if self.iops_sys is not None:
+ pinfo.raw_iops_sys = map(prepare, self.iops_sys.per_vm())
+ pinfo.iops_sys = data_property(agg_data(pinfo.raw_iops_sys))
+ else:
+ pinfo.raw_iops_sys = None
+ pinfo.iops_sys = None
+
fparams = self.get_params_from_fio_report()
fio_report_bw = sum(fparams['flt_bw'])
fio_report_iops = sum(fparams['flt_iops'])
@@ -397,6 +451,7 @@
self.task_file = self.join_remote("task.cfg")
self.sh_file = self.join_remote("cmd.sh")
self.err_out_file = self.join_remote("fio_err_out")
+ self.io_log_file = self.join_remote("io_log.txt")
self.exit_code_file = self.join_remote("exit_code")
self.max_latency = get("max_lat", None)
@@ -405,10 +460,7 @@
self.use_sudo = get("use_sudo", True)
self.raw_cfg = open(self.config_fname).read()
- self.fio_configs = fio_cfg_compile(self.raw_cfg,
- self.config_fname,
- self.config_params)
- self.fio_configs = list(self.fio_configs)
+ self.fio_configs = None
@classmethod
def load(cls, suite_name, folder):
@@ -559,6 +611,15 @@
rossh("chmod a+x " + self.join_remote("fio"), nolog=True)
def pre_run(self):
+ if 'FILESIZE' not in self.config_params:
+ # need to detect file size
+ pass
+
+ self.fio_configs = fio_cfg_compile(self.raw_cfg,
+ self.config_fname,
+ self.config_params)
+ self.fio_configs = list(self.fio_configs)
+
files = {}
for section in self.fio_configs:
sz = ssize2b(section.vals['size'])
@@ -676,8 +737,15 @@
if idx == max_retr - 1:
raise StopTestError("Fio failed", exc)
- logger.info("Sleeping 30s and retrying")
- time.sleep(30)
+ logger.info("Reconnectiongm, sleeping %ss and retrying", self.retry_time)
+
+ wait(pool.submit(node.connection.close)
+ for node in self.config.nodes)
+
+ time.sleep(self.retry_time)
+
+ wait(pool.submit(reconnect, node.connection, node.conn_url)
+ for node in self.config.nodes)
fname = "{0}_task.fio".format(pos)
with open(os.path.join(self.config.log_directory, fname), "w") as fd:
@@ -719,12 +787,42 @@
else:
sudo = ""
- bash_file = "#!/bin/bash\n" + \
- "cd {exec_folder}\n" + \
- "{fio_path}fio --output-format=json --output={out_file} " + \
- "--alloc-size=262144 {job_file} " + \
- " >{err_out_file} 2>&1 \n" + \
- "echo $? >{res_code_file}\n"
+ bash_file = """
+#!/bin/bash
+
+function get_dev() {{
+ if [ -b "$1" ] ; then
+ echo $1
+ else
+ echo $(df "$1" | tail -1 | awk '{{print $1}}')
+ fi
+}}
+
+function log_io_activiti(){{
+ local dest="$1"
+ local dev=$(get_dev "$2")
+ local sleep_time="$3"
+ dev=$(basename "$dev")
+
+ echo $dev
+
+ for (( ; ; )) ; do
+ grep -E "\\b$dev\\b" /proc/diskstats >> "$dest"
+ sleep $sleep_time
+ done
+}}
+
+sync
+cd {exec_folder}
+
+log_io_activiti {io_log_file} {test_file} 1 &
+local pid="$!"
+
+{fio_path}fio --output-format=json --output={out_file} --alloc-size=262144 {job_file} >{err_out_file} 2>&1
+echo $? >{res_code_file}
+kill -9 $pid
+
+"""
exec_folder = self.config.remote_dir
@@ -741,7 +839,9 @@
err_out_file=self.err_out_file,
res_code_file=self.exit_code_file,
exec_folder=exec_folder,
- fio_path=fio_path)
+ fio_path=fio_path,
+ test_file=self.config_params['FILENAME'],
+ io_log_file=self.io_log_file).strip()
with node.connection.open_sftp() as sftp:
save_to_remote(sftp, self.task_file, str(fio_cfg))
@@ -813,6 +913,9 @@
tp, cnt = tp_cnt.split('.')
files[tp].append((int(cnt), fname))
all_files.append(fname)
+ elif fname == os.path.basename(self.io_log_file):
+ files['iops'].append(('sys', fname))
+ all_files.append(fname)
arch_name = self.join_remote('wally_result.tar.gz')
tmp_dir = os.path.join(self.config.log_directory, 'tmp_' + conn_id)
@@ -892,9 +995,21 @@
for item in sorted(results, key=key_func):
test_dinfo = item.disk_perf_info()
+ testnodes_count = len(item.config.nodes)
iops, _ = test_dinfo.iops.rounded_average_conf()
+ if test_dinfo.iops_sys is not None:
+ iops_sys, iops_sys_conf = test_dinfo.iops_sys.rounded_average_conf()
+ _, iops_sys_dev = test_dinfo.iops_sys.rounded_average_dev()
+ iops_sys_per_vm = round_3_digit(iops_sys / testnodes_count)
+ iops_sys = round_3_digit(iops_sys)
+ else:
+ iops_sys = None
+ iops_sys_per_vm = None
+ iops_sys_dev = None
+ iops_sys_conf = None
+
bw, bw_conf = test_dinfo.bw.rounded_average_conf()
_, bw_dev = test_dinfo.bw.rounded_average_dev()
conf_perc = int(round(bw_conf * 100 / bw))
@@ -904,7 +1019,6 @@
lat_95 = round_3_digit(int(test_dinfo.lat_95))
lat_avg = round_3_digit(int(test_dinfo.lat_avg))
- testnodes_count = len(item.config.nodes)
iops_per_vm = round_3_digit(iops / testnodes_count)
bw_per_vm = round_3_digit(bw / testnodes_count)
@@ -924,7 +1038,12 @@
"bw_per_vm": int(bw_per_vm),
"lat_50": lat_50,
"lat_95": lat_95,
- "lat_avg": lat_avg})
+ "lat_avg": lat_avg,
+
+ "iops_sys": iops_sys,
+ "iops_sys_per_vm": iops_sys_per_vm,
+ "sys_conf": iops_sys_conf,
+ "sys_dev": iops_sys_dev})
return res
@@ -933,6 +1052,7 @@
Field("Name", "name", "l", 7),
Field("Description", "summ", "l", 19),
Field("IOPS\ncum", "iops", "r", 3),
+ # Field("IOPS_sys\ncum", "iops_sys", "r", 3),
Field("KiBps\ncum", "bw", "r", 6),
Field("Cnf %\n95%", "conf", "r", 3),
Field("Dev%", "dev", "r", 3),
diff --git a/wally/suits/io/verify.cfg b/wally/suits/io/verify.cfg
index 7882633..75ab2b4 100644
--- a/wally/suits/io/verify.cfg
+++ b/wally/suits/io/verify.cfg
@@ -6,10 +6,10 @@
runtime=15
# ---------------------------------------------------------------------
-[verify_{TEST_SUMM}]
-blocksize=4k
-rw=randwrite
-direct=1
+# [verify_{TEST_SUMM}]
+# blocksize=4k
+# rw=randwrite
+# direct=1
[verify_{TEST_SUMM}]
blocksize=4k
diff --git a/wally/utils.py b/wally/utils.py
index 2011a98..3fba2b0 100644
--- a/wally/utils.py
+++ b/wally/utils.py
@@ -370,3 +370,16 @@
for role, count in sorted(per_role.items()):
logger.debug("Found {0} nodes with role {1}".format(count, role))
+
+
+def which(program):
+ def is_exe(fpath):
+ return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
+
+ for path in os.environ["PATH"].split(os.pathsep):
+ path = path.strip('"')
+ exe_file = os.path.join(path, program)
+ if is_exe(exe_file):
+ return exe_file
+
+ return None