more improvements and fixes and new bugs
diff --git a/wally/discover/discover.py b/wally/discover/discover.py
index 3cd5cff..73407c0 100644
--- a/wally/discover/discover.py
+++ b/wally/discover/discover.py
@@ -27,11 +27,13 @@
"""
-def discover(ctx, discover, clusters_info, var_dir):
+def discover(ctx, discover, clusters_info, var_dir, discover_nodes=True):
nodes_to_run = []
clean_data = None
for cluster in discover:
- if cluster == "openstack":
+ if cluster == "openstack" and not discover_nodes:
+ logger.warning("Skip openstack cluster discovery")
+ elif cluster == "openstack" and discover_nodes:
cluster_info = clusters_info["openstack"]
conn = cluster_info['connection']
user, passwd, tenant = parse_creds(conn['creds'])
@@ -56,7 +58,9 @@
nodes_to_run.extend(os_nodes)
elif cluster == "fuel":
- res = fuel.discover_fuel_nodes(clusters_info['fuel'], var_dir)
+ res = fuel.discover_fuel_nodes(clusters_info['fuel'],
+ var_dir,
+ discover_nodes)
nodes, clean_data, openrc_dict = res
ctx.fuel_openstack_creds = {'name': openrc_dict['username'],
@@ -71,15 +75,20 @@
fuel_openrc_fname = os.path.join(var_dir,
env_f_name + "_openrc")
+
with open(fuel_openrc_fname, "w") as fd:
fd.write(openrc_templ.format(**ctx.fuel_openstack_creds))
+
msg = "Openrc for cluster {0} saves into {1}"
logger.debug(msg.format(env_name, fuel_openrc_fname))
nodes_to_run.extend(nodes)
elif cluster == "ceph":
- cluster_info = clusters_info["ceph"]
- nodes_to_run.extend(ceph.discover_ceph_nodes(cluster_info))
+ if discover_nodes:
+ cluster_info = clusters_info["ceph"]
+ nodes_to_run.extend(ceph.discover_ceph_nodes(cluster_info))
+ else:
+ logger.warning("Skip ceph cluster discovery")
else:
msg_templ = "Unknown cluster type in 'discover' parameter: {0!r}"
raise ValueError(msg_templ.format(cluster))
diff --git a/wally/discover/fuel.py b/wally/discover/fuel.py
index 149ec31..a787786 100644
--- a/wally/discover/fuel.py
+++ b/wally/discover/fuel.py
@@ -18,7 +18,7 @@
BASE_PF_PORT = 44006
-def discover_fuel_nodes(fuel_data, var_dir):
+def discover_fuel_nodes(fuel_data, var_dir, discover_nodes=True):
username, tenant_name, password = parse_creds(fuel_data['creds'])
creds = {"username": username,
"tenant_name": tenant_name,
@@ -28,6 +28,11 @@
cluster_id = get_cluster_id(conn, fuel_data['openstack_env'])
cluster = reflect_cluster(conn, cluster_id)
+
+ if not discover_nodes:
+ logger.warning("Skip fuel cluster discovery")
+ return ([], None, cluster.get_openrc())
+
version = FuelInfo(conn).get_version()
fuel_nodes = list(cluster.get_nodes())
@@ -114,6 +119,9 @@
def clean_fuel_port_forwarding(clean_data):
+ if clean_data is None:
+ return
+
conn, iface, ips_ports = clean_data
for ip, port in ips_ports:
forward_ssh_port(conn, iface, port, ip, clean=True)
diff --git a/wally/run_test.py b/wally/run_test.py
index 42e96ff..b22ce38 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -2,6 +2,7 @@
import os
import sys
+import time
import Queue
import pprint
import logging
@@ -15,7 +16,7 @@
from concurrent.futures import ThreadPoolExecutor
from wally import pretty_yaml
-from wally.tests_sensors import deploy_sensors_stage
+from wally.sensors_utils import deploy_sensors_stage
from wally.discover import discover, Node, undiscover
from wally import utils, report, ssh_utils, start_vms
from wally.suits.itest import IOPerfTest, PgBenchTest
@@ -60,8 +61,11 @@
log_warns=log_warns)
else:
raise ValueError("Unknown url type {0}".format(node.conn_url))
- except Exception:
- logger.exception("During connect to " + node.get_conn_id())
+ except Exception as exc:
+ # logger.exception("During connect to " + node.get_conn_id())
+ msg = "During connect to {0}: {1}".format(node.get_conn_id(),
+ exc.message)
+ logger.error(msg)
node.connection = None
@@ -139,14 +143,23 @@
results = []
+ # MAX_WAIT_TIME = 10
+ # end_time = time.time() + MAX_WAIT_TIME
+
+ # while time.time() < end_time:
while True:
for th in threads:
th.join(1)
gather_results(res_q, results)
+ # if time.time() > end_time:
+ # break
if all(not th.is_alive() for th in threads):
break
+ # if any(th.is_alive() for th in threads):
+ # logger.warning("Some test threads still running")
+
gather_results(res_q, results)
yield name, test.merge_results(results)
@@ -190,8 +203,11 @@
if cfg.get('discover') is not None:
discover_objs = [i.strip() for i in cfg['discover'].strip().split(",")]
- nodes, clean_data = discover(ctx, discover_objs,
- cfg['clouds'], cfg['var_dir'])
+ nodes, clean_data = discover(ctx,
+ discover_objs,
+ cfg['clouds'],
+ cfg['var_dir'],
+ not cfg['dont_discover_nodes'])
def undiscover_stage(cfg, ctx):
undiscover(clean_data)
@@ -203,7 +219,7 @@
ctx.nodes.append(Node(url, roles.split(",")))
-def get_os_credentials(cfg, ctx, creds_type):
+def get_OS_credentials(cfg, ctx, creds_type):
creds = None
if creds_type == 'clouds':
@@ -243,7 +259,7 @@
os_nodes_ids = []
os_creds_type = config['creds']
- os_creds = get_os_credentials(cfg, ctx, os_creds_type)
+ os_creds = get_OS_credentials(cfg, ctx, os_creds_type)
start_vms.nova_connect(**os_creds)
@@ -294,10 +310,12 @@
nodes=new_nodes,
undeploy=False)
- for test_group in config.get('tests', []):
- ctx.results.extend(run_tests(test_group, ctx.nodes))
+ if not cfg['no_tests']:
+ for test_group in config.get('tests', []):
+ ctx.results.extend(run_tests(test_group, ctx.nodes))
else:
- ctx.results.extend(run_tests(group, ctx.nodes))
+ if not cfg['no_tests']:
+ ctx.results.extend(run_tests(group, ctx.nodes))
def shut_down_vms_stage(cfg, ctx):
@@ -353,10 +371,12 @@
def console_report_stage(cfg, ctx):
for tp, data in ctx.results:
if 'io' == tp and data is not None:
+ print("\n")
print(IOPerfTest.format_for_console(data))
+ print("\n")
-def report_stage(cfg, ctx):
+def html_report_stage(cfg, ctx):
html_rep_fname = cfg['html_report_file']
try:
@@ -409,10 +429,15 @@
parser.add_argument("-i", '--build_id', type=str, default="id")
parser.add_argument("-t", '--build_type', type=str, default="GA")
parser.add_argument("-u", '--username', type=str, default="admin")
+ parser.add_argument("-n", '--no-tests', action='store_true',
+ help="Don't run tests", default=False)
parser.add_argument("-p", '--post-process-only', metavar="VAR_DIR",
help="Only process data from previour run")
parser.add_argument("-k", '--keep-vm', action='store_true',
help="Don't remove test vm's", default=False)
+ parser.add_argument("-d", '--dont-discover-nodes', action='store_true',
+ help="Don't connect/discover fuel nodes",
+ default=False)
parser.add_argument("config_file")
return parser.parse_args(argv[1:])
@@ -423,9 +448,7 @@
if opts.post_process_only is not None:
stages = [
- load_data_from(opts.post_process_only),
- console_report_stage,
- report_stage
+ load_data_from(opts.post_process_only)
]
else:
stages = [
@@ -434,11 +457,14 @@
connect_stage,
deploy_sensors_stage,
run_tests_stage,
- store_raw_results_stage,
- console_report_stage,
- report_stage
+ store_raw_results_stage
]
+ report_stages = [
+ console_report_stage,
+ html_report_stage
+ ]
+
load_config(opts.config_file, opts.post_process_only)
if cfg_dict.get('logging', {}).get("extra_logs", False) or opts.extra_logs:
@@ -456,14 +482,20 @@
ctx.build_meta['build_descrption'] = opts.build_description
ctx.build_meta['build_type'] = opts.build_type
ctx.build_meta['username'] = opts.username
+
cfg_dict['keep_vm'] = opts.keep_vm
+ cfg_dict['no_tests'] = opts.no_tests
+ cfg_dict['dont_discover_nodes'] = opts.dont_discover_nodes
try:
for stage in stages:
logger.info("Start {0.__name__} stage".format(stage))
stage(cfg_dict, ctx)
except Exception as exc:
- msg = "Exception during current stage: {0}".format(exc.message)
+ emsg = exc.message
+ if emsg == "":
+ emsg = str(exc)
+ msg = "Exception during {0.__name__}: {1}".format(stage, emsg)
logger.error(msg)
finally:
exc, cls, tb = sys.exc_info()
@@ -477,5 +509,8 @@
if exc is not None:
raise exc, cls, tb
- logger.info("All info stored into {0}".format(cfg_dict['var_dir']))
+ for report_stage in report_stages:
+ report_stage(cfg_dict, ctx)
+
+ logger.info("All info stored in {0} folder".format(cfg_dict['var_dir']))
return 0
diff --git a/wally/sensors/storage/grafana.py b/wally/sensors/grafana.py
similarity index 100%
rename from wally/sensors/storage/grafana.py
rename to wally/sensors/grafana.py
diff --git a/wally/sensors/storage/influx_exporter.py b/wally/sensors/influx_exporter.py
similarity index 100%
rename from wally/sensors/storage/influx_exporter.py
rename to wally/sensors/influx_exporter.py
diff --git a/wally/sensors/main.py b/wally/sensors/main.py
index e86bbed..2b6ab8b 100644
--- a/wally/sensors/main.py
+++ b/wally/sensors/main.py
@@ -50,12 +50,12 @@
sender = create_protocol(opts.url)
prev = {}
- while True:
- try:
- source_id = str(required_sensors.pop('source_id'))
- except KeyError:
- source_id = None
+ try:
+ source_id = str(required_sensors.pop('source_id'))
+ except KeyError:
+ source_id = None
+ while True:
gtime, data = get_values(required_sensors.items())
curr = {'time': SensorInfo(gtime, True)}
for name, val in data.items():
diff --git a/wally/sensors/storage/__init__.py b/wally/sensors/storage/__init__.py
deleted file mode 100644
index d7bf6aa..0000000
--- a/wally/sensors/storage/__init__.py
+++ /dev/null
@@ -1,104 +0,0 @@
-import struct
-
-
-def pack(val, tp=True):
- if isinstance(val, int):
- assert 0 <= val < 2 ** 16
-
- if tp:
- res = 'i'
- else:
- res = ""
-
- res += struct.pack("!U", val)
- elif isinstance(val, dict):
- assert len(val) < 2 ** 16
- if tp:
- res = "d"
- else:
- res = ""
-
- res += struct.pack("!U", len(val))
- for k, v in dict.items():
- assert 0 <= k < 2 ** 16
- assert 0 <= v < 2 ** 32
- res += struct.pack("!UI", k, v)
- elif isinstance(val, str):
- assert len(val) < 256
- if tp:
- res = "s"
- else:
- res = ""
- res += chr(len(val)) + val
- else:
- raise ValueError()
-
- return res
-
-
-def unpack(fd, tp=None):
- if tp is None:
- tp = fd.read(1)
-
- if tp == 'i':
- return struct.unpack("!U", fd.read(2))
- elif tp == 'd':
- res = {}
- val_len = struct.unpack("!U", fd.read(2))
- for _ in range(val_len):
- k, v = struct.unpack("!UI", fd.read(6))
- res[k] = v
- return res
- elif tp == 's':
- val_len = struct.unpack("!U", fd.read(2))
- return fd.read(val_len)
-
- raise ValueError()
-
-
-class LocalStorage(object):
- NEW_DATA = 0
- NEW_SENSOR = 1
- NEW_SOURCE = 2
-
- def __init__(self, fd):
- self.fd = fd
- self.sensor_ids = {}
- self.sources_ids = {}
- self.max_source_id = 0
- self.max_sensor_id = 0
-
- def add_data(self, source, sensor_values):
- source_id = self.sources_ids.get(source)
- if source_id is None:
- source_id = self.max_source_id
- self.sources_ids[source] = source_id
- self.emit(self.NEW_SOURCE, source_id, source)
- self.max_source_id += 1
-
- new_sensor_values = {}
-
- for name, val in sensor_values.items():
- sensor_id = self.sensor_ids.get(name)
- if sensor_id is None:
- sensor_id = self.max_sensor_id
- self.sensor_ids[name] = sensor_id
- self.emit(self.NEW_SENSOR, sensor_id, name)
- self.max_sensor_id += 1
- new_sensor_values[sensor_id] = val
-
- self.emit(self.NEW_DATA, source_id, new_sensor_values)
-
- def emit(self, tp, v1, v2):
- self.fd.write(chr(tp) + pack(v1, False) + pack(v2))
-
- def readall(self):
- tp = self.fd.read(1)
- if ord(tp) == self.NEW_DATA:
- pass
- elif ord(tp) == self.NEW_SENSOR:
- pass
- elif ord(tp) == self.NEW_SOURCE:
- pass
- else:
- raise ValueError()
diff --git a/wally/sensors/storage/grafana_template.js b/wally/sensors/storage/grafana_template.js
deleted file mode 100644
index 7c57924..0000000
--- a/wally/sensors/storage/grafana_template.js
+++ /dev/null
@@ -1,46 +0,0 @@
-/* global _ */
-
-/*
- * Complex scripted dashboard
- * This script generates a dashboard object that Grafana can load. It also takes a number of user
- * supplied URL parameters (int ARGS variable)
- *
- * Return a dashboard object, or a function
- *
- * For async scripts, return a function, this function must take a single callback function as argument,
- * call this callback function with the dashboard object (look at scripted_async.js for an example)
- */
-
-
-
-// accessable variables in this scope
-var window, document, ARGS, $, jQuery, moment, kbn;
-
-// Setup some variables
-var dashboard;
-
-// All url parameters are available via the ARGS object
-var ARGS;
-
-// Intialize a skeleton with nothing but a rows array and service object
-dashboard = {rows : []};
-
-// Set a title
-dashboard.title = 'Tests dash';
-
-// Set default time
-// time can be overriden in the url using from/to parameteres, but this is
-// handled automatically in grafana core during dashboard initialization
-dashboard.time = {
- from: "now-5m",
- to: "now"
-};
-
-dashboard.rows.push({
- title: 'Chart',
- height: '300px',
- panels: %s
-});
-
-
-return dashboard;
diff --git a/wally/sensors/storage/koder.js b/wally/sensors/storage/koder.js
deleted file mode 100644
index a65a454..0000000
--- a/wally/sensors/storage/koder.js
+++ /dev/null
@@ -1,47 +0,0 @@
-/* global _ */
-
-/*
- * Complex scripted dashboard
- * This script generates a dashboard object that Grafana can load. It also takes a number of user
- * supplied URL parameters (int ARGS variable)
- *
- * Return a dashboard object, or a function
- *
- * For async scripts, return a function, this function must take a single callback function as argument,
- * call this callback function with the dashboard object (look at scripted_async.js for an example)
- */
-
-
-
-// accessable variables in this scope
-var window, document, ARGS, $, jQuery, moment, kbn;
-
-// Setup some variables
-var dashboard;
-
-// All url parameters are available via the ARGS object
-var ARGS;
-
-// Intialize a skeleton with nothing but a rows array and service object
-dashboard = {rows : []};
-
-// Set a title
-dashboard.title = 'Tests dash';
-
-// Set default time
-// time can be overriden in the url using from/to parameteres, but this is
-// handled automatically in grafana core during dashboard initialization
-dashboard.time = {
- from: "now-5m",
- to: "now"
-};
-
-dashboard.rows.push({
- title: 'Chart',
- height: '300px',
- panels: [{"span": 12, "title": "writes_completed", "linewidth": 2, "type": "graph", "targets": [{"alias": "192.168.0.104 io sda1", "interval": "", "target": "disk io", "rawQuery": true, "query": "select value from \"writes_completed\" where $timeFilter and host='192.168.0.104' and device='sda1' order asc"}, {"alias": "192.168.0.104 io rbd1", "interval": "", "target": "disk io", "rawQuery": true, "query": "select value from \"writes_completed\" where $timeFilter and host='192.168.0.104' and device='rbd1' order asc"}], "tooltip": {"shared": true}, "fill": 1}, {"span": 12, "title": "sectors_written", "linewidth": 2, "type": "graph", "targets": [{"alias": "192.168.0.104 io sda1", "interval": "", "target": "disk io", "rawQuery": true, "query": "select value from \"sectors_written\" where $timeFilter and host='192.168.0.104' and device='sda1' order asc"}, {"alias": "192.168.0.104 io rbd1", "interval": "", "target": "disk io", "rawQuery": true, "query": "select value from \"sectors_written\" where $timeFilter and host='192.168.0.104' and device='rbd1' order asc"}], "tooltip": {"shared": true}, "fill": 1}]
-});
-
-
-return dashboard;
-
diff --git a/wally/tests_sensors.py b/wally/sensors_utils.py
similarity index 98%
rename from wally/tests_sensors.py
rename to wally/sensors_utils.py
index 1e0b1ad..3cd6bc8 100644
--- a/wally/tests_sensors.py
+++ b/wally/sensors_utils.py
@@ -75,7 +75,6 @@
mon_q = Queue.Queue()
fd = open(cfg_dict['sensor_storage'], "w")
- logger.info("Start sensors data receiving thread")
sensor_listen_th = threading.Thread(None, save_sensors_data, None,
(sensors_data_q, mon_q, fd))
sensor_listen_th.daemon = True
@@ -99,7 +98,6 @@
if nodes is None:
nodes = ctx.nodes
- logger.info("Deploing new sensors on {0} node(s)".format(len(nodes)))
monitored_nodes, sensors_configs = get_sensors_config_for_nodes(cfg,
nodes)
@@ -108,6 +106,7 @@
return
if ctx.sensors_mon_q is None:
+ logger.info("Start sensors data receiving thread")
ctx.sensors_mon_q = start_sensor_process_thread(ctx, cfg,
sensors_configs)
@@ -116,6 +115,7 @@
stop_and_remove_sensors(sensors_configs)
ctx.clear_calls_stack.append(remove_sensors_stage)
+ logger.info("Deploing new sensors on {0} node(s)".format(len(nodes)))
deploy_and_start_sensors(sensors_configs)
wait_for_new_sensors_data(ctx, monitored_nodes)
diff --git a/wally/ssh_utils.py b/wally/ssh_utils.py
index 0de7816..f1818ad 100644
--- a/wally/ssh_utils.py
+++ b/wally/ssh_utils.py
@@ -59,15 +59,15 @@
except socket.error:
retry_left = retry_count - i - 1
- if log_warns:
- msg = "Node {0.host}:{0.port} connection timeout."
+ if retry_left > 0:
+ if log_warns:
+ msg = "Node {0.host}:{0.port} connection timeout."
- if 0 != retry_left:
- msg += " {0} retry left.".format(retry_left)
+ if 0 != retry_left:
+ msg += " {0} retry left.".format(retry_left)
- logger.warning(msg.format(creds))
-
- if 0 == retry_left:
+ logger.warning(msg.format(creds))
+ else:
raise
time.sleep(1)
diff --git a/wally/start_vms.py b/wally/start_vms.py
index b3c141a..de1f312 100644
--- a/wally/start_vms.py
+++ b/wally/start_vms.py
@@ -18,28 +18,50 @@
logger = logging.getLogger("wally.vms")
-def ostack_get_creds():
- env = os.environ.get
- name = env('OS_USERNAME')
- passwd = env('OS_PASSWORD')
- tenant = env('OS_TENANT_NAME')
- auth_url = env('OS_AUTH_URL')
-
- return name, passwd, tenant, auth_url
-
-
+STORED_OPENSTACK_CREDS = None
NOVA_CONNECTION = None
+CINDER_CONNECTION = None
+
+
+def ostack_get_creds():
+ if STORED_OPENSTACK_CREDS is None:
+ env = os.environ.get
+ name = env('OS_USERNAME')
+ passwd = env('OS_PASSWORD')
+ tenant = env('OS_TENANT_NAME')
+ auth_url = env('OS_AUTH_URL')
+ return name, passwd, tenant, auth_url
+ else:
+ return STORED_OPENSTACK_CREDS
def nova_connect(name=None, passwd=None, tenant=None, auth_url=None):
global NOVA_CONNECTION
+ global STORED_OPENSTACK_CREDS
+
if NOVA_CONNECTION is None:
if name is None:
name, passwd, tenant, auth_url = ostack_get_creds()
+ else:
+ STORED_OPENSTACK_CREDS = (name, passwd, tenant, auth_url)
+
NOVA_CONNECTION = n_client('1.1', name, passwd, tenant, auth_url)
return NOVA_CONNECTION
+def cinder_connect(name=None, passwd=None, tenant=None, auth_url=None):
+ global CINDER_CONNECTION
+ global STORED_OPENSTACK_CREDS
+
+ if CINDER_CONNECTION is None:
+ if name is None:
+ name, passwd, tenant, auth_url = ostack_get_creds()
+ else:
+ STORED_OPENSTACK_CREDS = (name, passwd, tenant, auth_url)
+ CINDER_CONNECTION = c_client(name, passwd, tenant, auth_url)
+ return CINDER_CONNECTION
+
+
def nova_disconnect():
global NOVA_CONNECTION
if NOVA_CONNECTION is not None:
@@ -144,12 +166,15 @@
def create_volume(size, name):
- cinder = c_client(*ostack_get_creds())
+ cinder = cinder_connect()
+ # vol_id = "2974f227-8755-4333-bcae-cd9693cd5d04"
+ # logger.warning("Reusing volume {0}".format(vol_id))
+ # vol = cinder.volumes.get(vol_id)
vol = cinder.volumes.create(size=size, display_name=name)
err_count = 0
- while vol['status'] != 'available':
- if vol['status'] == 'error':
+ while vol.status != 'available':
+ if vol.status == 'error':
if err_count == 3:
logger.critical("Fail to create volume")
raise RuntimeError("Fail to create volume")
@@ -160,7 +185,7 @@
vol = cinder.volumes.create(size=size, display_name=name)
continue
time.sleep(1)
- vol = cinder.volumes.get(vol['id'])
+ vol = cinder.volumes.get(vol.id)
return vol
@@ -300,7 +325,6 @@
nova.servers.delete(srv)
for j in range(120):
- # print "wait till server deleted"
all_id = set(alive_srv.id for alive_srv in nova.servers.list())
if srv.id not in all_id:
break
@@ -313,16 +337,13 @@
raise RuntimeError("Failed to start server".format(srv.id))
if vol_sz is not None:
- # print "creating volume"
vol = create_volume(vol_sz, name)
- # print "attach volume to server"
- nova.volumes.create_server_volume(srv.id, vol['id'], None)
+ nova.volumes.create_server_volume(srv.id, vol.id, None)
if flt_ip is Allocate:
flt_ip = nova.floating_ips.create(pool)
if flt_ip is not None:
- # print "attaching ip to server"
srv.add_floating_ip(flt_ip)
return flt_ip.ip, nova.servers.get(srv.id)
@@ -332,7 +353,7 @@
clear_all(NOVA_CONNECTION, nodes_ids, None)
-def clear_all(nova, ids=None, name_templ="ceph-test-{0}"):
+def clear_all(nova, ids=None, name_templ=None):
def need_delete(srv):
if name_templ is not None:
@@ -340,6 +361,14 @@
else:
return srv.id in ids
+ volumes_to_delete = []
+ cinder = cinder_connect()
+ for vol in cinder.volumes.list():
+ for attachment in vol.attachments:
+ if attachment['server_id'] in ids:
+ volumes_to_delete.append(vol)
+ break
+
deleted_srvs = set()
for srv in nova.servers.list():
if need_delete(srv):
@@ -360,13 +389,9 @@
# wait till vm actually deleted
- if name_templ is not None:
- cinder = c_client(*ostack_get_creds())
- for vol in cinder.volumes.list():
- if isinstance(vol.display_name, basestring):
- if re.match(name_templ.format("\\d+"), vol.display_name):
- if vol.status in ('available', 'error'):
- logger.debug("Deleting volume " + vol.display_name)
- cinder.volumes.delete(vol)
+ logger.warning("Volume deletion commented out")
+ # for vol in volumes_to_delete:
+ # logger.debug("Deleting volume " + vol.display_name)
+ # cinder.volumes.delete(vol)
logger.debug("Clearing done (yet some volumes may still deleting)")
diff --git a/wally/statistic.py b/wally/statistic.py
index 0729283..f3fcd6a 100644
--- a/wally/statistic.py
+++ b/wally/statistic.py
@@ -15,6 +15,10 @@
return med, dev
+def round_3_digit(val):
+ return round_deviation((val, val / 10.0))[0]
+
+
def round_deviation(med_dev):
med, dev = med_dev
diff --git a/wally/suits/io/agent.py b/wally/suits/io/agent.py
index 7589346..d15d18e 100644
--- a/wally/suits/io/agent.py
+++ b/wally/suits/io/agent.py
@@ -55,6 +55,40 @@
counter = [0]
+def extract_iterables(vals):
+ iterable_names = []
+ iterable_values = []
+ rest = {}
+
+ for val_name, val in vals.items():
+ if val is None or not val.startswith('{%'):
+ rest[val_name] = val
+ else:
+ assert val.endswith("%}")
+ content = val[2:-2]
+ iterable_names.append(val_name)
+ iterable_values.append(list(i.strip() for i in content.split(',')))
+
+ return iterable_names, iterable_values, rest
+
+
+def format_params_into_section(sec, params, final=True):
+ processed_vals = {}
+
+ for val_name, val in sec.items():
+ if val is None:
+ processed_vals[val_name] = val
+ else:
+ try:
+ processed_vals[val_name] = val.format(**params)
+ except KeyError:
+ if final:
+ raise
+ processed_vals[val_name] = val
+
+ return processed_vals
+
+
def process_section(name, vals, defaults, format_params):
vals = vals.copy()
params = format_params.copy()
@@ -67,30 +101,25 @@
repeat = 1
# this code can be optimized
- iterable_names = []
- iterable_values = []
- processed_vals = {}
-
- for val_name, val in vals.items():
- if val is None:
- processed_vals[val_name] = val
- # remove hardcode
- elif val.startswith('{%'):
- assert val.endswith("%}")
- content = val[2:-2].format(**params)
- iterable_names.append(val_name)
- iterable_values.append(list(i.strip() for i in content.split(',')))
- else:
- processed_vals[val_name] = val.format(**params)
+ iterable_names, iterable_values, processed_vals = extract_iterables(vals)
group_report_err_msg = "Group reporting should be set if numjobs != 1"
if iterable_values == []:
+ processed_vals = format_params_into_section(processed_vals, params,
+ final=False)
params['UNIQ'] = 'UN{0}'.format(counter[0])
counter[0] += 1
params['TEST_SUMM'] = get_test_summary(processed_vals)
- if processed_vals.get('numjobs', '1') != '1':
+ num_jobs = int(processed_vals.get('numjobs', '1'))
+ fsize = to_bytes(processed_vals['size'])
+ params['PER_TH_OFFSET'] = fsize // num_jobs
+
+ processed_vals = format_params_into_section(processed_vals, params,
+ final=True)
+
+ if num_jobs != 1:
assert 'group_reporting' in processed_vals, group_report_err_msg
ramp_time = processed_vals.get('ramp_time')
@@ -104,11 +133,21 @@
processed_vals['ramp_time'] = ramp_time
else:
for it_vals in itertools.product(*iterable_values):
+ processed_vals = format_params_into_section(processed_vals, params,
+ final=False)
+
processed_vals.update(dict(zip(iterable_names, it_vals)))
params['UNIQ'] = 'UN{0}'.format(counter[0])
counter[0] += 1
params['TEST_SUMM'] = get_test_summary(processed_vals)
+ num_jobs = int(processed_vals.get('numjobs', '1'))
+ fsize = to_bytes(processed_vals['size'])
+ params['PER_TH_OFFSET'] = fsize // num_jobs
+
+ processed_vals = format_params_into_section(processed_vals, params,
+ final=True)
+
if processed_vals.get('numjobs', '1') != '1':
assert 'group_reporting' in processed_vals,\
group_report_err_msg
@@ -130,6 +169,7 @@
time = 0
for _, params in combinations:
time += int(params.get('ramp_time', 0))
+ time += int(params.get('_ramp_time', 0))
time += int(params.get('runtime', 0))
return time
@@ -236,6 +276,8 @@
return (1024 ** 2) * int(sz[:-1])
if sz[-1] == 'k':
return 1024 * int(sz[:-1])
+ if sz[-1] == 'g':
+ return (1024 ** 3) * int(sz[:-1])
raise
@@ -348,12 +390,17 @@
def do_run_fio(bconf):
benchmark_config = format_fio_config(bconf)
cmd = ["fio", "--output-format=json", "--alloc-size=262144", "-"]
- p = subprocess.Popen(cmd, stdin=subprocess.PIPE,
+ p = subprocess.Popen(cmd,
+ stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
+ stderr=subprocess.PIPE)
# set timeout
- raw_out, _ = p.communicate(benchmark_config)
+ raw_out, raw_err = p.communicate(benchmark_config)
+
+ if 0 != p.returncode:
+ msg = "Fio failed with code: {0}\nOutput={1}"
+ raise OSError(msg.format(p.returncode, raw_err))
try:
parsed_out = json.loads(raw_out)["jobs"]
@@ -373,7 +420,14 @@
MAX_JOBS = 1000
-def next_test_portion(whole_conf, runcycle):
+def next_test_portion(whole_conf, runcycle, cluster=False):
+ if cluster:
+ for name, sec in whole_conf:
+ if '_ramp_time' in sec:
+ sec['ramp_time'] = sec.pop('_ramp_time')
+ yield [(name, sec)]
+ return
+
jcount = 0
runtime = 0
bconf = []
@@ -458,12 +512,14 @@
res[jname] = j_res
-def compile(benchmark_config, params, runcycle=None):
+def compile(benchmark_config, params, skip=0, runcycle=None, cluster=False):
whole_conf = list(parse_fio_config_full(benchmark_config, params))
+ whole_conf = whole_conf[skip:]
res = ""
- for bconf in next_test_portion(whole_conf, runcycle):
+ for bconf in next_test_portion(whole_conf, runcycle, cluster=cluster):
res += format_fio_config(bconf)
+ res += "\n#" + "-" * 50 + "\n\n"
return res
@@ -473,7 +529,8 @@
runcycle=None,
raw_results_func=None,
skip_tests=0,
- fake_fio=False):
+ fake_fio=False,
+ cluster=False):
whole_conf = list(parse_fio_config_full(benchmark_config, params))
whole_conf = whole_conf[skip_tests:]
@@ -482,7 +539,7 @@
executed_tests = 0
ok = True
try:
- for bconf in next_test_portion(whole_conf, runcycle):
+ for bconf in next_test_portion(whole_conf, runcycle, cluster=cluster):
if fake_fio:
res_cfg_it = do_run_fio_fake(bconf)
@@ -504,7 +561,7 @@
continue
add_job_results(jname, job_output, jconfig, res)
-
+ curr_test_num += 1
msg_template = "Done {0} tests from {1}. ETA: {2}"
exec_time = estimate_cfg(benchmark_config, params, curr_test_num)
@@ -588,6 +645,8 @@
help="Skip NUM tests")
parser.add_argument("--faked-fio", action='store_true',
default=False, help="Emulate fio with 0 test time")
+ parser.add_argument("--cluster", action='store_true',
+ default=False, help="Apply cluster-test settings")
parser.add_argument("--params", nargs="*", metavar="PARAM=VAL",
default=[],
help="Provide set of pairs PARAM=VAL to" +
@@ -620,14 +679,16 @@
return 0
if argv_obj.num_tests or argv_obj.compile:
- bconf = list(parse_fio_config_full(job_cfg, params))
- bconf = bconf[argv_obj.skip_tests:]
-
if argv_obj.compile:
- out_fd.write(format_fio_config(bconf))
+ data = compile(job_cfg, params, argv_obj.skip_tests,
+ cluster=argv_obj.cluster)
+ out_fd.write(data)
out_fd.write("\n")
if argv_obj.num_tests:
+ bconf = list(parse_fio_config_full(job_cfg, params,
+ argv_obj.cluster))
+ bconf = bconf[argv_obj.skip_tests:]
print len(bconf)
return 0
@@ -653,7 +714,8 @@
argv_obj.runcycle,
rrfunc,
argv_obj.skip_tests,
- argv_obj.faked_fio)
+ argv_obj.faked_fio,
+ cluster=argv_obj.cluster)
etime = time.time()
res = {'__meta__': {'raw_cfg': job_cfg, 'params': params}, 'res': job_res}
diff --git a/wally/suits/io/formatter.py b/wally/suits/io/formatter.py
index 529b78a..a1da1c3 100644
--- a/wally/suits/io/formatter.py
+++ b/wally/suits/io/formatter.py
@@ -1,8 +1,8 @@
import texttable
from wally.utils import ssize_to_b
-from wally.statistic import med_dev
from wally.suits.io.agent import get_test_summary
+from wally.statistic import med_dev, round_deviation, round_3_digit
def key_func(k_data):
@@ -37,14 +37,24 @@
descr = get_test_summary(data)
- iops, _ = med_dev(data['iops'])
- bw, bwdev = med_dev(data['bw'])
+ iops, _ = round_deviation(med_dev(data['iops']))
+ bw, bwdev = round_deviation(med_dev(data['bw']))
# 3 * sigma
- dev_perc = int((bwdev * 300) / bw)
+ if 0 == bw:
+ assert 0 == bwdev
+ dev_perc = 0
+ else:
+ dev_perc = int((bwdev * 300) / bw)
- params = (descr, int(iops), int(bw), dev_perc,
- int(med_dev(data['lat'])[0]) // 1000)
+ med_lat, _ = round_deviation(med_dev(data['lat']))
+ med_lat = int(med_lat) // 1000
+
+ iops = round_3_digit(iops)
+ bw = round_3_digit(bw)
+ med_lat = round_3_digit(med_lat)
+
+ params = (descr, int(iops), int(bw), dev_perc, med_lat)
tab.add_row(params)
header = ["Description", "IOPS", "BW KiBps", "Dev * 3 %", "clat ms"]
diff --git a/wally/suits/io/io_scenario_ceph.cfg b/wally/suits/io/io_scenario_ceph.cfg
new file mode 100644
index 0000000..5e793f2
--- /dev/null
+++ b/wally/suits/io/io_scenario_ceph.cfg
@@ -0,0 +1,62 @@
+[defaults]
+wait_for_previous
+group_reporting
+time_based
+buffered=0
+iodepth=1
+softrandommap=1
+filename={FILENAME}
+NUM_ROUNDS=7
+
+size=5G
+ramp_time=20
+runtime=20
+
+# ---------------------------------------------------------------------
+# check different thread count, direct read/write mode. (bw, iops) = func(th_count)
+# also check BW for seq read/write.
+# ---------------------------------------------------------------------
+[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=1m
+rw=read
+direct=1
+offset_increment={PER_TH_OFFSET}
+numjobs={% 20, 120 %}
+
+# # ---------------------------------------------------------------------
+# # check different thread count, sync mode. (latency, iops) = func(th_count)
+# # ---------------------------------------------------------------------
+# [hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+# blocksize=4k
+# rw=randwrite
+# sync=1
+# numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
+#
+# # ---------------------------------------------------------------------
+# # check different thread count, direct read mode. (latency, iops) = func(th_count)
+# # also check iops for randread
+# # ---------------------------------------------------------------------
+# [hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+# blocksize=4k
+# rw=randread
+# direct=1
+# numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
+#
+# # ---------------------------------------------------------------------
+# # check different thread count, direct read/write mode. (bw, iops) = func(th_count)
+# # also check BW for seq read/write.
+# # ---------------------------------------------------------------------
+# [hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+# blocksize=1m
+# rw={% read, write %}
+# direct=1
+# offset_increment=1073741824 # 1G
+# numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
+#
+# # ---------------------------------------------------------------------
+# # check IOPS randwrite.
+# # ---------------------------------------------------------------------
+# [hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+# blocksize=4k
+# rw=randwrite
+# direct=1
diff --git a/wally/suits/io/io_scenario_hdd.cfg b/wally/suits/io/io_scenario_hdd.cfg
index 5e24009..46191f2 100644
--- a/wally/suits/io/io_scenario_hdd.cfg
+++ b/wally/suits/io/io_scenario_hdd.cfg
@@ -8,8 +8,8 @@
filename={FILENAME}
NUM_ROUNDS=7
-ramp_time=5
size=10Gb
+ramp_time=5
runtime=30
# ---------------------------------------------------------------------
@@ -39,6 +39,7 @@
blocksize=1m
rw={% read, write %}
direct=1
+offset_increment=1073741824 # 1G
numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
# ---------------------------------------------------------------------
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 1e93247..c5615bb 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -137,29 +137,45 @@
self.files_to_copy = {local_fname: self.io_py_remote}
copy_paths(conn, self.files_to_copy)
- files = {}
+ if self.options.get('prefill_files', True):
+ files = {}
- for secname, params in self.configs:
- sz = ssize_to_b(params['size'])
- msz = msz = sz / (1024 ** 2)
- if sz % (1024 ** 2) != 0:
- msz += 1
+ for secname, params in self.configs:
+ sz = ssize_to_b(params['size'])
+ msz = sz / (1024 ** 2)
+ if sz % (1024 ** 2) != 0:
+ msz += 1
- fname = params['filename']
- files[fname] = max(files.get(fname, 0), msz)
+ fname = params['filename']
- # logger.warning("dd run DISABLED")
- # cmd_templ = "dd if=/dev/zero of={0} bs={1} count={2}"
+ # if already has other test with the same file name
+ # take largest size
+ files[fname] = max(files.get(fname, 0), msz)
- cmd_templ = "sudo dd if=/dev/zero of={0} bs={1} count={2}"
- for fname, sz in files.items():
- cmd = cmd_templ.format(fname, 1024 ** 2, msz)
- run_over_ssh(conn, cmd, timeout=msz, node=self.node)
+ # logger.warning("dd run DISABLED")
+ # cmd_templ = "dd if=/dev/zero of={0} bs={1} count={2}"
+
+ cmd_templ = "sudo dd if=/dev/zero of={0} bs={1} count={2}"
+ ssize = 0
+ stime = time.time()
+
+ for fname, curr_sz in files.items():
+ cmd = cmd_templ.format(fname, 1024 ** 2, curr_sz)
+ ssize += curr_sz
+ run_over_ssh(conn, cmd, timeout=curr_sz, node=self.node)
+
+ ddtime = time.time() - stime
+ if ddtime > 1E-3:
+ fill_bw = int(ssize / ddtime)
+ mess = "Initiall dd fill bw is {0} MiBps for this vm"
+ logger.info(mess.format(fill_bw))
+ else:
+ logger.warning("Test files prefill disabled")
def run(self, conn, barrier):
# logger.warning("No tests runned")
# return
- cmd_templ = "sudo env python2 {0} --type {1} {2} --json -"
+ cmd_templ = "sudo env python2 {0} {3} --type {1} {2} --json -"
# cmd_templ = "env python2 {0} --type {1} {2} --json -"
params = " ".join("{0}={1}".format(k, v)
@@ -168,16 +184,24 @@
if "" != params:
params = "--params " + params
- cmd = cmd_templ.format(self.io_py_remote, self.tool, params)
+ if self.options.get('cluster', False):
+ logger.info("Cluster mode is used")
+ cluster_opt = "--cluster"
+ else:
+ logger.info("Non-cluster mode is used")
+ cluster_opt = ""
+
+ cmd = cmd_templ.format(self.io_py_remote, self.tool, params,
+ cluster_opt)
logger.debug("Waiting on barrier")
exec_time = io_agent.estimate_cfg(self.raw_cfg, self.config_params)
exec_time_str = sec_to_str(exec_time)
try:
+ timeout = int(exec_time * 1.2 + 300)
if barrier.wait():
templ = "Test should takes about {0}. Will wait at most {1}"
- timeout = int(exec_time * 1.1 + 300)
logger.info(templ.format(exec_time_str, sec_to_str(timeout)))
out_err = run_over_ssh(conn, cmd,