a lot of changes
diff --git a/wally/config.py b/wally/config.py
index 08a70d7..90fde3c 100644
--- a/wally/config.py
+++ b/wally/config.py
@@ -64,6 +64,7 @@
cfg_dict['text_report_file'] = in_var_dir('report.txt')
cfg_dict['log_file'] = in_var_dir('log.txt')
cfg_dict['sensor_storage'] = in_var_dir('sensor_storage.txt')
+ cfg_dict['nodes_report_file'] = in_var_dir('nodes.yaml')
testnode_log_root = cfg_dict.get('testnode_log_root', '/var/wally')
testnode_log_dir = os.path.join(testnode_log_root, "{0}/{{name}}")
@@ -73,6 +74,10 @@
cfg_dict['test_log_directory'] = in_var_dir('test_logs')
mkdirs_if_unxists(cfg_dict['test_log_directory'])
+ cfg_dict['hwinfo_directory'] = in_var_dir('hwinfo')
+ cfg_dict['hwreport_fname'] = in_var_dir('hwinfo.txt')
+ mkdirs_if_unxists(cfg_dict['hwinfo_directory'])
+
def color_me(color):
RESET_SEQ = "\033[0m"
diff --git a/wally/discover/__init__.py b/wally/discover/__init__.py
index b02809c..3ac983e 100644
--- a/wally/discover/__init__.py
+++ b/wally/discover/__init__.py
@@ -1,5 +1,5 @@
"this package contains node discovery code"
-from .discover import discover, undiscover
from .node import Node
+from .discover import discover
-__all__ = ["discover", "Node", "undiscover"]
+__all__ = ["discover", "Node"]
diff --git a/wally/discover/discover.py b/wally/discover/discover.py
index 3cab884..5802ac3 100644
--- a/wally/discover/discover.py
+++ b/wally/discover/discover.py
@@ -96,9 +96,4 @@
msg_templ = "Unknown cluster type in 'discover' parameter: {0!r}"
raise ValueError(msg_templ.format(cluster))
- return nodes_to_run, clean_data
-
-
-def undiscover(clean_data):
- if clean_data is not None:
- fuel.clean_fuel_port_forwarding(clean_data)
+ return nodes_to_run
diff --git a/wally/discover/fuel.py b/wally/discover/fuel.py
index a787786..7a14f94 100644
--- a/wally/discover/fuel.py
+++ b/wally/discover/fuel.py
@@ -1,15 +1,18 @@
-import os
import re
-import sys
import socket
import logging
from urlparse import urlparse
-import yaml
+import sshtunnel
+from paramiko import AuthenticationException
+
+
from wally.fuel_rest_api import (KeystoneAuth, get_cluster_id,
reflect_cluster, FuelInfo)
-from wally.utils import parse_creds
-from wally.ssh_utils import run_over_ssh, connect
+from wally.utils import (parse_creds, check_input_param, StopTestError,
+ clean_resource, get_ip_for_target)
+from wally.ssh_utils import (run_over_ssh, connect, set_key_for_node,
+ read_from_remote)
from .node import Node
@@ -26,6 +29,9 @@
conn = KeystoneAuth(fuel_data['url'], creds, headers=None)
+ msg = "openstack_env should be provided in fuel config"
+ check_input_param('openstack_env' in fuel_data, msg)
+
cluster_id = get_cluster_id(conn, fuel_data['openstack_env'])
cluster = reflect_cluster(conn, cluster_id)
@@ -45,55 +51,57 @@
fuel_host = urlparse(fuel_data['url']).hostname
fuel_ip = socket.gethostbyname(fuel_host)
- ssh_conn = connect("{0}@@{1}".format(ssh_creds, fuel_host))
+
+ try:
+ ssh_conn = connect("{0}@{1}".format(ssh_creds, fuel_host))
+ except AuthenticationException:
+ raise StopTestError("Wrong fuel credentials")
+ except Exception:
+ logger.exception("While connection to FUEL")
+ raise StopTestError("Failed to connect to FUEL")
fuel_ext_iface = get_external_interface(ssh_conn, fuel_ip)
- # TODO: keep ssh key in memory
- # http://stackoverflow.com/questions/11994139/how-to-include-the-private-key-in-paramiko-after-fetching-from-string
- fuel_key_file = os.path.join(var_dir, "fuel_master_node_id_rsa")
- download_master_key(ssh_conn, fuel_key_file)
+ logger.debug("Downloading fuel master key")
+ fuel_key = download_master_key(ssh_conn)
nodes = []
- ports = range(BASE_PF_PORT, BASE_PF_PORT + len(fuel_nodes))
ips_ports = []
- for fuel_node, port in zip(fuel_nodes, ports):
- ip = fuel_node.get_ip(network)
- forward_ssh_port(ssh_conn, fuel_ext_iface, port, ip)
+ logger.info("Forwarding ssh ports from FUEL nodes localhost")
+ fuel_usr, fuel_passwd = ssh_creds.split(":", 1)
+ ips = [str(fuel_node.get_ip(network)) for fuel_node in fuel_nodes]
+ port_fw = forward_ssh_ports(fuel_host, fuel_usr, fuel_passwd, ips)
+ listen_ip = get_ip_for_target(fuel_host)
- conn_url = "ssh://root@{0}:{1}:{2}".format(fuel_host,
- port,
- fuel_key_file)
+ for port, fuel_node, ip in zip(port_fw, fuel_nodes, ips):
+ logger.debug(
+ "SSH port forwarding {0} => localhost:{1}".format(ip, port))
+
+ conn_url = "ssh://root@127.0.0.1:{0}".format(port)
+ set_key_for_node(('127.0.0.1', port), fuel_key)
+
node = Node(conn_url, fuel_node['roles'])
- node.monitor_url = None
+ node.monitor_ip = listen_ip
nodes.append(node)
ips_ports.append((ip, port))
logger.debug("Found %s fuel nodes for env %r" %
(len(nodes), fuel_data['openstack_env']))
- # return ([],
- # (ssh_conn, fuel_ext_iface, ips_ports),
- # cluster.get_openrc())
-
return (nodes,
(ssh_conn, fuel_ext_iface, ips_ports),
cluster.get_openrc())
-def download_master_key(conn, dest):
+def download_master_key(conn):
# download master key
- sftp = conn.open_sftp()
- sftp.get('/root/.ssh/id_rsa', dest)
- os.chmod(dest, 0o400)
- sftp.close()
-
- logger.debug("Fuel master key stored in {0}".format(dest))
+ with conn.open_sftp() as sftp:
+ return read_from_remote(sftp, '/root/.ssh/id_rsa')
def get_external_interface(conn, ip):
- data = run_over_ssh(conn, "ip a", node='fuel-master')
+ data = run_over_ssh(conn, "ip a", node='fuel-master', nolog=True)
curr_iface = None
for line in data.split("\n"):
@@ -109,38 +117,14 @@
raise KeyError("Can't found interface for ip {0}".format(ip))
-def forward_ssh_port(conn, iface, new_port, ip, clean=False):
- mode = "-D" if clean is True else "-A"
- cmd = "iptables -t nat {mode} PREROUTING -p tcp " + \
- "-i {iface} --dport {port} -j DNAT --to {ip}:22"
- run_over_ssh(conn,
- cmd.format(iface=iface, port=new_port, ip=ip, mode=mode),
- node='fuel-master')
-
-
-def clean_fuel_port_forwarding(clean_data):
- if clean_data is None:
- return
-
- conn, iface, ips_ports = clean_data
- for ip, port in ips_ports:
- forward_ssh_port(conn, iface, port, ip, clean=True)
-
-
-def main(argv):
- fuel_data = yaml.load(open(sys.argv[1]).read())['clouds']['fuel']
- nodes, to_clean, openrc = discover_fuel_nodes(fuel_data, '/tmp')
-
- print nodes
- print openrc
- print "Ready to test"
-
- sys.stdin.readline()
-
- clean_fuel_port_forwarding(to_clean)
-
- return 0
-
-
-if __name__ == "__main__":
- main(sys.argv[1:])
+def forward_ssh_ports(proxy_ip, proxy_user, proxy_passwd, ips):
+ for ip in ips:
+ tunnel = sshtunnel.open(
+ (proxy_ip, 22),
+ ssh_username=proxy_user,
+ ssh_password=proxy_passwd,
+ threaded=True,
+ remote_bind_address=(ip, 22))
+ tunnel.__enter__()
+ clean_resource(tunnel.__exit__, None, None, None)
+ yield tunnel.local_bind_port
diff --git a/wally/discover/node.py b/wally/discover/node.py
index 9f4356f..9161a21 100644
--- a/wally/discover/node.py
+++ b/wally/discover/node.py
@@ -9,7 +9,7 @@
self.roles = roles
self.conn_url = conn_url
self.connection = None
- self.monitor_url = None
+ self.monitor_ip = None
def get_ip(self):
if self.conn_url == 'local':
diff --git a/wally/fuel_rest_api.py b/wally/fuel_rest_api.py
index 0567e71..737bf2e 100644
--- a/wally/fuel_rest_api.py
+++ b/wally/fuel_rest_api.py
@@ -287,7 +287,6 @@
if net['name'] == network:
iface_name = net['dev']
for iface in self.get_info()['meta']['interfaces']:
- print iface, net['ip']
if iface['name'] == iface_name:
try:
return iface['ip']
diff --git a/wally/hw_info.py b/wally/hw_info.py
new file mode 100644
index 0000000..9d38913
--- /dev/null
+++ b/wally/hw_info.py
@@ -0,0 +1,330 @@
+import re
+import xml.etree.ElementTree as ET
+
+from wally import ssh_utils, utils
+
+
+def get_data(rr, data):
+ match_res = re.search("(?ims)" + rr, data)
+ return match_res.group(0)
+
+
+class HWInfo(object):
+ def __init__(self):
+ self.hostname = None
+ self.cores = []
+
+ # /dev/... devices
+ self.disks_info = {}
+
+ # real disks on raid controller
+ self.disks_raw_info = {}
+
+ self.net_info = {}
+ self.ram_size = 0
+ self.sys_name = None
+ self.mb = None
+ self.raw = None
+ self.storage_controllers = []
+
+ def get_summary(self):
+ cores = sum(count for _, count in self.cores)
+ disks = sum(size for _, size in self.disks_info.values())
+
+ return {'cores': cores, 'ram': self.ram_size, 'storage': disks}
+
+ def __str__(self):
+ res = []
+
+ summ = self.get_summary()
+ summary = "Simmary: {cores} cores, {ram}B RAM, {disk}B storage"
+ res.append(summary.format(cores=summ['cores'],
+ ram=utils.b2ssize(summ['ram']),
+ disk=utils.b2ssize(summ['storage'])))
+ res.append(str(self.sys_name))
+ if self.mb is not None:
+ res.append("Motherboard: " + self.mb)
+
+ if self.ram_size == 0:
+ res.append("RAM: Failed to get RAM size")
+ else:
+ res.append("RAM " + utils.b2ssize(self.ram_size) + "B")
+
+ if self.cores == []:
+ res.append("CPU cores: Failed to get CPU info")
+ else:
+ res.append("CPU cores:")
+ for name, count in self.cores:
+ if count > 1:
+ res.append(" {0} * {1}".format(count, name))
+ else:
+ res.append(" " + name)
+
+ if self.storage_controllers != []:
+ res.append("Disk controllers:")
+ for descr in self.storage_controllers:
+ res.append(" " + descr)
+
+ if self.disks_info != {}:
+ res.append("Storage devices:")
+ for dev, (model, size) in sorted(self.disks_info.items()):
+ ssize = utils.b2ssize(size) + "B"
+ res.append(" {0} {1} {2}".format(dev, ssize, model))
+ else:
+ res.append("Storage devices's: Failed to get info")
+
+ if self.disks_raw_info != {}:
+ res.append("Disks devices:")
+ for dev, descr in sorted(self.disks_raw_info.items()):
+ res.append(" {0} {1}".format(dev, descr))
+ else:
+ res.append("Disks devices's: Failed to get info")
+
+ if self.net_info != {}:
+ res.append("Net adapters:")
+ for name, (speed, dtype) in self.net_info.items():
+ res.append(" {0} {2} duplex={1}".format(name, dtype, speed))
+ else:
+ res.append("Net adapters: Failed to get net info")
+
+ return str(self.hostname) + ":\n" + "\n".join(" " + i for i in res)
+
+
+class SWInfo(object):
+ def __init__(self):
+ self.os = None
+ self.partitions = None
+ self.kernel_version = None
+ self.fio_version = None
+ self.libvirt_version = None
+ self.kvm_version = None
+ self.qemu_version = None
+ self.OS_version = None
+ self.ceph_version = None
+
+
+def get_sw_info(conn):
+ res = SWInfo()
+ return res
+
+
+def get_hw_info(conn):
+ res = HWInfo()
+ lshw_out = ssh_utils.run_over_ssh(conn, 'sudo lshw -xml 2>/dev/null',
+ nolog=True)
+
+ res.raw = lshw_out
+ lshw_et = ET.fromstring(lshw_out)
+
+ try:
+ res.hostname = lshw_et.find("node").attrib['id']
+ except:
+ pass
+
+ try:
+ res.sys_name = (lshw_et.find("node/vendor").text + " " +
+ lshw_et.find("node/product").text)
+ res.sys_name = res.sys_name.replace("(To be filled by O.E.M.)", "")
+ res.sys_name = res.sys_name.replace("(To be Filled by O.E.M.)", "")
+ except:
+ pass
+
+ core = lshw_et.find("node/node[@id='core']")
+ if core is None:
+ return
+
+ try:
+ res.mb = " ".join(core.find(node).text
+ for node in ['vendor', 'product', 'version'])
+ except:
+ pass
+
+ for cpu in core.findall("node[@class='processor']"):
+ try:
+ model = cpu.find('product').text
+ threads_node = cpu.find("configuration/setting[@id='threads']")
+ if threads_node is None:
+ threads = 1
+ else:
+ threads = int(threads_node.attrib['value'])
+ res.cores.append((model, threads))
+ except:
+ pass
+
+ res.ram_size = 0
+ for mem_node in core.findall(".//node[@class='memory']"):
+ descr = mem_node.find('description')
+ try:
+ if descr is not None and descr.text == 'System Memory':
+ mem_sz = mem_node.find('size')
+ if mem_sz is None:
+ for slot_node in mem_node.find("node[@class='memory']"):
+ slot_sz = slot_node.find('size')
+ if slot_sz is not None:
+ assert slot_sz.attrib['units'] == 'bytes'
+ res.ram_size += int(slot_sz.text)
+ else:
+ assert mem_sz.attrib['units'] == 'bytes'
+ res.ram_size += int(mem_sz.text)
+ except:
+ pass
+
+ for net in core.findall(".//node[@class='network']"):
+ try:
+ link = net.find("configuration/setting[@id='link']")
+ if link.attrib['value'] == 'yes':
+ name = net.find("logicalname").text
+ speed_node = net.find("configuration/setting[@id='speed']")
+
+ if speed_node is None:
+ speed = None
+ else:
+ speed = speed_node.attrib['value']
+
+ dup_node = net.find("configuration/setting[@id='duplex']")
+ if dup_node is None:
+ dup = None
+ else:
+ dup = dup_node.attrib['value']
+
+ res.net_info[name] = (speed, dup)
+ except:
+ pass
+
+ for controller in core.findall(".//node[@class='storage']"):
+ try:
+ description = getattr(controller.find("description"), 'text', "")
+ product = getattr(controller.find("product"), 'text', "")
+ vendor = getattr(controller.find("vendor"), 'text', "")
+ dev = getattr(controller.find("logicalname"), 'text', "")
+ if dev != "":
+ res.storage_controllers.append(
+ "{0}: {1} {2} {3}".format(dev, description,
+ vendor, product))
+ else:
+ res.storage_controllers.append(
+ "{0} {1} {2}".format(description,
+ vendor, product))
+ except:
+ pass
+
+ for disk in core.findall(".//node[@class='disk']"):
+ try:
+ lname_node = disk.find('logicalname')
+ if lname_node is not None:
+ dev = lname_node.text.split('/')[-1]
+
+ if dev == "" or dev[-1].isdigit():
+ continue
+
+ sz_node = disk.find('size')
+ assert sz_node.attrib['units'] == 'bytes'
+ sz = int(sz_node.text)
+ res.disks_info[dev] = ('', sz)
+ else:
+ description = disk.find('description').text
+ product = disk.find('product').text
+ vendor = disk.find('vendor').text
+ version = disk.find('version').text
+ serial = disk.find('serial').text
+
+ full_descr = "{0} {1} {2} {3} {4}".format(
+ description, product, vendor, version, serial)
+
+ businfo = disk.find('businfo').text
+ res.disks_raw_info[businfo] = full_descr
+ except:
+ pass
+
+ return res
+
+# import traceback
+# print ET.tostring(disk)
+# traceback.print_exc()
+
+# print get_hw_info(None)
+
+# def get_hw_info(conn):
+# res = HWInfo(None)
+# remote_run = functools.partial(ssh_utils.run_over_ssh, conn, nolog=True)
+
+# # some data
+# with conn.open_sftp() as sftp:
+# proc_data = ssh_utils.read_from_remote(sftp, '/proc/cpuinfo')
+# mem_data = ssh_utils.read_from_remote(sftp, '/proc/meminfo')
+
+# # cpu info
+# curr_core = {}
+# for line in proc_data.split("\n"):
+# if line.strip() == "":
+# if curr_core != {}:
+# res.cores.append(curr_core)
+# curr_core = {}
+# else:
+# param, val = line.split(":", 1)
+# curr_core[param.strip()] = val.strip()
+
+# if curr_core != {}:
+# res.cores.append(curr_core)
+
+# # RAM info
+# for line in mem_data.split("\n"):
+# if line.startswith("MemTotal"):
+# res.ram_size = int(line.split(":", 1)[1].split()[0]) * 1024
+# break
+
+# # HDD info
+# for dev in remote_run('ls /dev').split():
+# if dev[-1].isdigit():
+# continue
+
+# if dev.startswith('sd') or dev.startswith('hd'):
+# model = None
+# size = None
+
+# for line in remote_run('sudo hdparm -I /dev/' + dev).split("\n"):
+# if "Model Number:" in line:
+# model = line.split(':', 1)[1]
+# elif "device size with M = 1024*1024" in line:
+# size = int(line.split(':', 1)[1].split()[0])
+# size *= 1024 ** 2
+
+# res.disks_info[dev] = (model, size)
+
+# # Network info
+# separator = '*-network'
+# net_info = remote_run('sudo lshw -class network')
+# for net_dev_info in net_info.split(separator):
+# if net_dev_info.strip().startswith("DISABLED"):
+# continue
+
+# if ":" not in net_dev_info:
+# continue
+
+# dev_params = {}
+# for line in net_dev_info.split("\n"):
+# line = line.strip()
+# if ':' in line:
+# key, data = line.split(":", 1)
+# dev_params[key.strip()] = data.strip()
+
+# if 'configuration' not in dev_params:
+# print "!!!!!", net_dev_info
+# continue
+
+# conf = dev_params['configuration']
+# if 'link=yes' in conf:
+# if 'speed=' in conf:
+# speed = conf.split('speed=', 1)[1]
+# speed = speed.strip().split()[0]
+# else:
+# speed = None
+
+# if "duplex=" in conf:
+# dtype = conf.split("duplex=", 1)[1]
+# dtype = dtype.strip().split()[0]
+# else:
+# dtype = None
+
+# res.net_info[dev_params['logical name']] = (speed, dtype)
+# return res
diff --git a/wally/report.py b/wally/report.py
index eeffefc..5b4c858 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -1,5 +1,4 @@
import os
-import math
import bisect
import logging
@@ -10,10 +9,8 @@
import wally
from wally import charts
-from wally.utils import parse_creds, ssize_to_b
-from wally.statistic import round_3_digit, round_deviation
-from wally.suits.io.results_loader import process_disk_info
-from wally.meta_info import total_lab_info, collect_lab_data
+from wally.utils import ssize2b
+from wally.statistic import round_3_digit, data_property
logger = logging.getLogger("wally.report")
@@ -33,6 +30,43 @@
report_funcs = []
+class PerfInfo(object):
+ def __init__(self, name, raw, meta):
+ self.name = name
+ self.bw = None
+ self.iops = None
+ self.lat = None
+ self.raw = raw
+ self.meta = meta
+
+
+def split_and_add(data, block_size):
+ assert len(data) % block_size == 0
+ res = [0] * block_size
+
+ for idx, val in enumerate(data):
+ res[idx % block_size] += val
+
+ return res
+
+
+def process_disk_info(test_data):
+ data = {}
+ vm_count = test_data['__test_meta__']['testnodes_count']
+ for name, results in test_data['res'].items():
+ assert len(results['bw']) % vm_count == 0
+ block_count = len(results['bw']) // vm_count
+
+ pinfo = PerfInfo(name, results, test_data['__test_meta__'])
+ pinfo.bw = data_property(split_and_add(results['bw'], block_count))
+ pinfo.iops = data_property(split_and_add(results['iops'],
+ block_count))
+
+ pinfo.lat = data_property(results['lat'])
+ data[name] = pinfo
+ return data
+
+
def report(name, required_fields):
def closure(func):
report_funcs.append((required_fields.split(","), name, func))
@@ -63,7 +97,7 @@
if res.name.startswith(name_pref):
iotime = 1000000. / res.iops
iotime_max = iotime * (1 + res.dev * 3)
- bsize = ssize_to_b(res.raw['blocksize'])
+ bsize = ssize2b(res.raw['blocksize'])
plot_data[bsize] = (iotime, iotime_max)
min_sz = min(plot_data)
@@ -74,22 +108,17 @@
e = []
for k, (v, vmax) in sorted(plot_data.items()):
- # y.append(math.log10(v - min_iotime))
- # x.append(math.log10(k))
- # e.append(y[-1] - math.log10(vmax - min_iotime))
y.append(v - min_iotime)
x.append(k)
e.append(y[-1] - (vmax - min_iotime))
- print e
-
tp = 'rrd'
plt.errorbar(x, y, e, linestyle='None', label=names[tp],
color=colors[color], ecolor="black",
marker=markers[marker])
plt.yscale('log')
plt.xscale('log')
- plt.show()
+ # plt.show()
# ynew = approximate_line(ax, ay, ax, True)
# plt.plot(ax, ynew, color=colors[color])
@@ -103,50 +132,43 @@
linearity_report = report('linearity', 'linearity_test')(linearity_report)
-def render_hdd_html(dest, info, lab_description):
+def render_all_html(dest, info, lab_description, templ_name):
very_root_dir = os.path.dirname(os.path.dirname(wally.__file__))
templ_dir = os.path.join(very_root_dir, 'report_templates')
- templ_file = os.path.join(templ_dir, "report_hdd.html")
+ templ_file = os.path.join(templ_dir, templ_name)
templ = open(templ_file, 'r').read()
- for name, val in info.__dict__.items():
+ data = info.__dict__.copy()
+ for name, val in data.items():
if not name.startswith('__'):
if val is None:
- info.__dict__[name] = '-'
- else:
- info.__dict__[name] = round_3_digit(val)
+ data[name] = '-'
+ elif isinstance(val, (int, float, long)):
+ data[name] = round_3_digit(val)
- data = info.__dict__.copy()
- for k, v in data.items():
- if v is None:
- data[k] = "-"
+ data['bw_read_max'] = (data['bw_read_max'][0] // 1024,
+ data['bw_read_max'][1])
+ data['bw_write_max'] = (data['bw_write_max'][0] // 1024,
+ data['bw_write_max'][1])
report = templ.format(lab_info=lab_description, **data)
open(dest, 'w').write(report)
+def render_hdd_html(dest, info, lab_description):
+ render_all_html(dest, info, lab_description, "report_hdd.html")
+
+
def render_ceph_html(dest, info, lab_description):
- very_root_dir = os.path.dirname(os.path.dirname(wally.__file__))
- templ_dir = os.path.join(very_root_dir, 'report_templates')
- templ_file = os.path.join(templ_dir, "report_ceph.html")
- templ = open(templ_file, 'r').read()
-
- for name, val in info.__dict__.items():
- if not name.startswith('__') and isinstance(val, (int, long, float)):
- setattr(info, name, round_3_digit(val))
-
- data = info.__dict__.copy()
- for k, v in data.items():
- if v is None:
- data[k] = "-"
-
- report = templ.format(lab_info=lab_description, **data)
- open(dest, 'w').write(report)
+ render_all_html(dest, info, lab_description, "report_ceph.html")
-def io_chart(title, concurence, latv, iops_or_bw, iops_or_bw_dev,
+def io_chart(title, concurence,
+ latv, latv_min, latv_max,
+ iops_or_bw, iops_or_bw_dev,
legend, fname):
- bar_data, bar_dev = iops_or_bw, iops_or_bw_dev
+ bar_data = iops_or_bw
+ bar_dev = iops_or_bw_dev
legend = [legend]
iops_or_bw_per_vm = []
@@ -159,13 +181,14 @@
bar_dev_top.append(bar_data[i] + bar_dev[i])
bar_dev_bottom.append(bar_data[i] - bar_dev[i])
- latv = [lat / 1000 for lat in latv]
ch = charts.render_vertical_bar(title, legend, [bar_data], [bar_dev_top],
[bar_dev_bottom], file_name=fname,
scale_x=concurence, label_x="clients",
label_y=legend[0],
lines=[
(latv, "msec", "rr", "lat"),
+ # (latv_min, None, None, "lat_min"),
+ # (latv_max, None, None, "lat_max"),
(iops_or_bw_per_vm, None, None,
legend[0] + " per client")
])
@@ -191,7 +214,7 @@
make_plots(processed_results, path, plots)
-def make_plots(processed_results, path, plots):
+def make_plots(processed_results, path, plots, max_lat=400000):
for name_pref, fname, desc in plots:
chart_data = []
@@ -202,28 +225,34 @@
if len(chart_data) == 0:
raise ValueError("Can't found any date for " + name_pref)
- use_bw = ssize_to_b(chart_data[0].raw['blocksize']) > 16 * 1024
+ use_bw = ssize2b(chart_data[0].raw['blocksize']) > 16 * 1024
chart_data.sort(key=lambda x: x.raw['concurence'])
- lat = [x.lat for x in chart_data]
+ # if x.lat.average < max_lat]
+ lat = [x.lat.average / 1000 for x in chart_data]
+
+ lat_min = [x.lat.min / 1000 for x in chart_data if x.lat.min < max_lat]
+ lat_max = [x.lat.max / 1000 for x in chart_data if x.lat.max < max_lat]
+
vm_count = x.meta['testnodes_count']
concurence = [x.raw['concurence'] * vm_count for x in chart_data]
if use_bw:
- data = [x.bw for x in chart_data]
- data_dev = [x.bw * x.dev for x in chart_data]
+ data = [x.bw.average / 1000 for x in chart_data]
+ data_dev = [x.bw.confidence / 1000 for x in chart_data]
name = "BW"
else:
- data = [x.iops for x in chart_data]
- data_dev = [x.iops * x.dev for x in chart_data]
+ data = [x.iops.average for x in chart_data]
+ data_dev = [x.iops.confidence for x in chart_data]
name = "IOPS"
- io_chart(desc, concurence, lat, data, data_dev, name, fname)
+ io_chart(desc, concurence, lat, lat_min, lat_max,
+ data, data_dev, name, fname)
def find_max_where(processed_results, sync_mode, blocksize, rw, iops=True):
- result = [0, 0]
+ result = None
attr = 'iops' if iops else 'bw'
for measurement in processed_results.values():
ok = measurement.raw['sync_mode'] == sync_mode
@@ -231,8 +260,13 @@
ok = ok and (measurement.raw['rw'] == rw)
if ok:
- if getattr(measurement, attr) > result[0]:
- result = [getattr(measurement, attr), measurement.dev]
+ field = getattr(measurement, attr)
+
+ if result is None:
+ result = field
+ elif field.average > result.average:
+ result = field
+
return result
@@ -244,21 +278,27 @@
'd', '4k', 'randwrite')
di.direct_iops_r_max = find_max_where(processed_results,
'd', '4k', 'randread')
+
di.bw_write_max = find_max_where(processed_results,
'd', '16m', 'randwrite', False)
+ if di.bw_write_max is None:
+ di.bw_write_max = find_max_where(processed_results,
+ 'd', '1m', 'write', False)
+
di.bw_read_max = find_max_where(processed_results,
'd', '16m', 'randread', False)
+ if di.bw_read_max is None:
+ di.bw_read_max = find_max_where(processed_results,
+ 'd', '1m', 'read', False)
for res in processed_results.values():
if res.raw['sync_mode'] == 's' and res.raw['blocksize'] == '4k':
if res.raw['rw'] != 'randwrite':
continue
- rws4k_iops_lat_th.append((res.iops, res.lat,
+ rws4k_iops_lat_th.append((res.iops.average,
+ res.lat.average,
res.raw['concurence']))
- di.bw_write_max[0] /= 1000
- di.bw_read_max[0] /= 1000
-
rws4k_iops_lat_th.sort(key=lambda (_1, _2, conc): conc)
latv = [lat for _, lat, _ in rws4k_iops_lat_th]
@@ -274,11 +314,8 @@
lat1 = latv[pos - 1]
lat2 = latv[pos]
- th1 = rws4k_iops_lat_th[pos - 1][2]
- th2 = rws4k_iops_lat_th[pos][2]
-
- iops1 = rws4k_iops_lat_th[pos - 1][0]
- iops2 = rws4k_iops_lat_th[pos][0]
+ iops1, _, th1 = rws4k_iops_lat_th[pos - 1]
+ iops2, _, th2 = rws4k_iops_lat_th[pos]
th_lat_coef = (th2 - th1) / (lat2 - lat1)
th3 = th_lat_coef * (tlat - lat1) + th1
@@ -290,10 +327,9 @@
hdi = DiskInfo()
def pp(x):
- med, dev = round_deviation((x[0], x[1] * x[0]))
- # 3 sigma in %
- dev = int(float(dev) / med * 100)
- return (med, dev)
+ med, conf = x.rounded_average_conf()
+ conf_perc = int(float(conf) / med * 100)
+ return (med, conf_perc)
hdi.direct_iops_r_max = pp(di.direct_iops_r_max)
hdi.direct_iops_w_max = pp(di.direct_iops_w_max)
@@ -320,31 +356,16 @@
render_ceph_html(path, di, lab_info)
-def make_io_report(results, path, lab_url=None, creds=None):
- lab_info = None
-
- # if lab_url is not None:
- # username, password, tenant_name = parse_creds(creds)
- # creds = {'username': username,
- # 'password': password,
- # "tenant_name": tenant_name}
- # try:
- # data = collect_lab_data(lab_url, creds)
- # lab_info = total_lab_info(data)
- # except Exception as exc:
- # logger.warning("Can't collect lab data: {0!s}".format(exc))
-
- if lab_info is None:
- lab_info = {
- "total_disk": "None",
- "total_memory": "None",
- "nodes_count": "None",
- "processor_count": "None"
- }
+def make_io_report(dinfo, results, path, lab_info=None):
+ lab_info = {
+ "total_disk": "None",
+ "total_memory": "None",
+ "nodes_count": "None",
+ "processor_count": "None"
+ }
try:
- processed_results = process_disk_info(results)
- res_fields = sorted(processed_results.keys())
+ res_fields = sorted(dinfo.keys())
for fields, name, func in report_funcs:
for field in fields:
pos = bisect.bisect_left(res_fields, field)
@@ -357,7 +378,7 @@
else:
hpath = path.format(name)
logger.debug("Generatins report " + name + " into " + hpath)
- func(processed_results, hpath, lab_info)
+ func(dinfo, hpath, lab_info)
break
else:
logger.warning("No report generator found for this load")
diff --git a/wally/run_test.py b/wally/run_test.py
index 8c1fece..72ba4cf 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -17,8 +17,9 @@
from concurrent.futures import ThreadPoolExecutor
from wally import pretty_yaml
+from wally.hw_info import get_hw_info
+from wally.discover import discover, Node
from wally.timeseries import SensorDatastore
-from wally.discover import discover, Node, undiscover
from wally import utils, report, ssh_utils, start_vms
from wally.suits.itest import IOPerfTest, PgBenchTest, MysqlTest
from wally.sensors_utils import deploy_sensors_stage
@@ -49,6 +50,7 @@
self.clear_calls_stack = []
self.openstack_nodes_ids = []
self.sensors_mon_q = None
+ self.hw_info = []
def connect_one(node, vm=False):
@@ -85,6 +87,34 @@
list(pool.map(connect_one_f, nodes))
+def collect_hw_info_stage(cfg, ctx):
+ if os.path.exists(cfg['hwreport_fname']):
+ msg = "{0} already exists. Skip hw info"
+ logger.info(msg.format(cfg['hwreport_fname']))
+ return
+
+ with ThreadPoolExecutor(32) as pool:
+ connections = (node.connection for node in ctx.nodes)
+ ctx.hw_info.extend(pool.map(get_hw_info, connections))
+
+ with open(cfg['hwreport_fname'], 'w') as hwfd:
+ for node, info in zip(ctx.nodes, ctx.hw_info):
+ hwfd.write("-" * 60 + "\n")
+ hwfd.write("Roles : " + ", ".join(node.roles) + "\n")
+ hwfd.write(str(info) + "\n")
+ hwfd.write("-" * 60 + "\n\n")
+
+ if info.hostname is not None:
+ fname = os.path.join(
+ cfg_dict['hwinfo_directory'],
+ info.hostname + "_lshw.xml")
+
+ with open(fname, "w") as fd:
+ fd.write(info.raw)
+ logger.info("Hardware report stored in " + cfg['hwreport_fname'])
+ logger.debug("Raw hardware info in " + cfg['hwinfo_directory'] + " folder")
+
+
def test_thread(test, node, barrier, res_q):
exc = None
try:
@@ -200,7 +230,7 @@
# logger.warning("Some test threads still running")
gather_results(res_q, results)
- result = test.merge_results(results)
+ result = test_cls.merge_results(results)
result['__test_meta__'] = {'testnodes_count': len(test_nodes)}
yield name, result
@@ -244,22 +274,51 @@
if cfg.get('discover') is not None:
discover_objs = [i.strip() for i in cfg['discover'].strip().split(",")]
- nodes, clean_data = discover(ctx,
- discover_objs,
- cfg['clouds'],
- cfg['var_dir'],
- not cfg['dont_discover_nodes'])
+ nodes = discover(ctx,
+ discover_objs,
+ cfg['clouds'],
+ cfg['var_dir'],
+ not cfg['dont_discover_nodes'])
- def undiscover_stage(cfg, ctx):
- undiscover(clean_data)
-
- ctx.clear_calls_stack.append(undiscover_stage)
ctx.nodes.extend(nodes)
for url, roles in cfg.get('explicit_nodes', {}).items():
ctx.nodes.append(Node(url, roles.split(",")))
+def save_nodes_stage(cfg, ctx):
+ cluster = {}
+ for node in ctx.nodes:
+ roles = node.roles[:]
+ if 'testnode' in roles:
+ roles.remove('testnode')
+
+ if len(roles) != 0:
+ cluster[node.conn_url] = roles
+
+ with open(cfg['nodes_report_file'], "w") as fd:
+ fd.write(pretty_yaml.dumps(cluster))
+
+
+def reuse_vms_stage(vm_name_pattern, conn_pattern):
+ def reuse_vms(cfg, ctx):
+ try:
+ msg = "Looking for vm with name like {0}".format(vm_name_pattern)
+ logger.debug(msg)
+
+ os_creds = get_OS_credentials(cfg, ctx, "clouds")
+ conn = start_vms.nova_connect(**os_creds)
+ for ip in start_vms.find_vms(conn, vm_name_pattern):
+ node = Node(conn_pattern.format(ip=ip), ['testnode'])
+ ctx.nodes.append(node)
+ except Exception as exc:
+ msg = "Vm like {0} lookup failed".format(vm_name_pattern)
+ logger.exception(msg)
+ raise utils.StopTestError(msg, exc)
+
+ return reuse_vms
+
+
def get_OS_credentials(cfg, ctx, creds_type):
creds = None
@@ -332,7 +391,6 @@
os_creds_type = config['creds']
os_creds = get_OS_credentials(cfg, ctx, os_creds_type)
-
start_vms.nova_connect(**os_creds)
logger.info("Preparing openstack")
@@ -453,8 +511,9 @@
def console_report_stage(cfg, ctx):
for tp, data in ctx.results:
if 'io' == tp and data is not None:
+ dinfo = report.process_disk_info(data)
print("\n")
- print(IOPerfTest.format_for_console(data))
+ print(IOPerfTest.format_for_console(data, dinfo))
print("\n")
if tp in ['mysql', 'pgbench'] and data is not None:
print("\n")
@@ -464,28 +523,26 @@
def html_report_stage(cfg, ctx):
html_rep_fname = cfg['html_report_file']
+ found = False
+ for tp, data in ctx.results:
+ if 'io' == tp and data is not None:
+ if found:
+ logger.error("Making reports for more than one " +
+ "io block isn't supported! All " +
+ "report, except first are skipped")
+ continue
+ found = True
+ dinfo = report.process_disk_info(data)
+ report.make_io_report(dinfo, data, html_rep_fname,
+ lab_info=ctx.hw_info)
- try:
- fuel_url = cfg['clouds']['fuel']['url']
- except KeyError:
- fuel_url = None
-
- try:
- creds = cfg['clouds']['fuel']['creds']
- except KeyError:
- creds = None
-
- report.make_io_report(ctx.results, html_rep_fname, fuel_url, creds=creds)
-
- text_rep_fname = cfg_dict['text_report_file']
- with open(text_rep_fname, "w") as fd:
- for tp, data in ctx.results:
- if 'io' == tp and data is not None:
- fd.write(IOPerfTest.format_for_console(data))
+ text_rep_fname = cfg_dict['text_report_file']
+ with open(text_rep_fname, "w") as fd:
+ fd.write(IOPerfTest.format_for_console(data, dinfo))
fd.write("\n")
fd.flush()
- logger.info("Text report were stored in " + text_rep_fname)
+ logger.info("Text report were stored in " + text_rep_fname)
def complete_log_nodes_statistic(cfg, ctx):
@@ -540,8 +597,9 @@
default=False)
parser.add_argument("-r", '--no-html-report', action='store_true',
help="Skip html report", default=False)
- parser.add_argument("--params", nargs="*", metavar="testname.paramname",
+ parser.add_argument("--params", metavar="testname.paramname",
help="Test params", default=[])
+ parser.add_argument("--reuse-vms", default=None, metavar="vm_name_prefix")
parser.add_argument("config_file")
return parser.parse_args(argv[1:])
@@ -549,6 +607,7 @@
def main(argv):
opts = parse_args(argv)
+ load_config(opts.config_file, opts.post_process_only)
if opts.post_process_only is not None:
stages = [
@@ -556,13 +615,26 @@
]
else:
stages = [
- discover_stage,
+ discover_stage
+ ]
+
+ if opts.reuse_vms is not None:
+ pref, ssh_templ = opts.reuse_vms.split(',', 1)
+ stages.append(reuse_vms_stage(pref, ssh_templ))
+
+ stages.extend([
log_nodes_statistic,
- connect_stage,
+ save_nodes_stage,
+ connect_stage])
+
+ if cfg_dict.get('collect_info', True):
+ stages.append(collect_hw_info_stage)
+
+ stages.extend([
deploy_sensors_stage,
run_tests_stage,
store_raw_results_stage
- ]
+ ])
report_stages = [
console_report_stage,
@@ -571,8 +643,6 @@
if not opts.no_html_report:
report_stages.append(html_report_stage)
- load_config(opts.config_file, opts.post_process_only)
-
if cfg_dict.get('logging', {}).get("extra_logs", False) or opts.extra_logs:
level = logging.DEBUG
else:
@@ -614,8 +684,17 @@
try:
logger.info("Start {0.__name__} stage".format(stage))
stage(cfg_dict, ctx)
- except utils.StopTestError as exc:
- logger.error(msg_templ.format(stage, exc))
+ except utils.StopTestError as cleanup_exc:
+ logger.error(msg_templ.format(stage, cleanup_exc))
+ except Exception:
+ logger.exception(msg_templ_no_exc.format(stage))
+
+ logger.debug("Start utils.cleanup")
+ for clean_func, args, kwargs in utils.iter_clean_func():
+ try:
+ clean_func(*args, **kwargs)
+ except utils.StopTestError as cleanup_exc:
+ logger.error(msg_templ.format(stage, cleanup_exc))
except Exception:
logger.exception(msg_templ_no_exc.format(stage))
diff --git a/wally/sensors/grafana.py b/wally/sensors/grafana.py
deleted file mode 100644
index 9823fac..0000000
--- a/wally/sensors/grafana.py
+++ /dev/null
@@ -1,47 +0,0 @@
-import json
-
-
-query = """
-select value from "{series}"
-where $timeFilter and
-host='{host}' and device='{device}'
-order asc
-"""
-
-
-def make_dashboard_file(config):
- series = ['writes_completed', 'sectors_written']
- dashboards = []
-
- for serie in series:
- dashboard = dict(title=serie, type='graph',
- span=12, fill=1, linewidth=2,
- tooltip={'shared': True})
-
- targets = []
-
- for ip, devs in config.items():
- for device in devs:
- params = {
- 'series': serie,
- 'host': ip,
- 'device': device
- }
-
- target = dict(
- target="disk io",
- query=query.replace("\n", " ").format(**params).strip(),
- interval="",
- alias="{0} io {1}".format(ip, device),
- rawQuery=True
- )
- targets.append(target)
-
- dashboard['targets'] = targets
- dashboards.append(dashboard)
-
- fc = open("grafana_template.js").read()
- return fc % (json.dumps(dashboards),)
-
-
-print make_dashboard_file({'192.168.0.104': ['sda1', 'rbd1']})
diff --git a/wally/sensors/main.py b/wally/sensors/main.py
index 55a9584..2d0bc81 100644
--- a/wally/sensors/main.py
+++ b/wally/sensors/main.py
@@ -56,6 +56,7 @@
prev = {}
next_data_record_time = time.time()
+ first_round = True
while True:
real_time = int(time.time())
@@ -84,8 +85,12 @@
if source_id is not None:
curr['source_id'] = source_id
- print report_time, int((report_time - time.time()) * 10) * 0.1
- sender.send(curr)
+ # on first round not all fields was ready
+ # this leads to setting wrong headers list
+ if not first_round:
+ sender.send(curr)
+ else:
+ first_round = False
next_data_record_time = report_time + opts.timeout + 0.5
time.sleep(next_data_record_time - time.time())
diff --git a/wally/sensors/protocol.py b/wally/sensors/protocol.py
index fad7e00..67aef2a 100644
--- a/wally/sensors/protocol.py
+++ b/wally/sensors/protocol.py
@@ -62,6 +62,8 @@
result = (self.HEADERS + source_id +
self.END_OF_SOURCE_ID +
+ socket.gethostname() +
+ self.END_OF_SOURCE_ID +
flen + forder + self.END_OF_HEADERS)
if self.headers_send_cycles_left > 0:
@@ -85,13 +87,14 @@
def __init__(self):
self.fields = {}
self.formats = {}
+ self.hostnames = {}
def unpack(self, data):
code = data[0]
- source_id, _, packed_data = data[1:].partition(
- StructSerializerSend.END_OF_SOURCE_ID)
if code == StructSerializerSend.HEADERS:
+ source_id, hostname, packed_data = data[1:].split(
+ StructSerializerSend.END_OF_SOURCE_ID, 2)
# fields order provided
flen_sz = struct.calcsize("!H")
flen = struct.unpack("!H", packed_data[:flen_sz])[0]
@@ -111,11 +114,14 @@
else:
self.fields[source_id] = ['time'] + forder
self.formats[source_id] = "!I" + "I" * flen
+ self.hostnames[source_id] = hostname
if len(rest) != 0:
return self.unpack(rest)
return None
else:
+ source_id, packed_data = data[1:].split(
+ StructSerializerSend.END_OF_SOURCE_ID, 1)
assert code == StructSerializerSend.DATA,\
"Unknown code {0!r}".format(code)
@@ -133,6 +139,7 @@
vals = struct.unpack(s_format, packed_data)
res = dict(zip(fields, vals))
res['source_id'] = source_id
+ res['hostname'] = self.hostnames[source_id]
return res
diff --git a/wally/sensors/receiver.py b/wally/sensors/receiver.py
deleted file mode 100644
index ff0f223..0000000
--- a/wally/sensors/receiver.py
+++ /dev/null
@@ -1,19 +0,0 @@
-from .api import start_monitoring, Empty
-# from influx_exporter import connect, add_data
-
-uri = "udp://192.168.0.104:12001"
-# infldb_url = "influxdb://perf:perf@192.168.152.42:8086/perf"
-# conn = connect(infldb_url)
-
-monitor_config = {'127.0.0.1':
- {"block-io": {'allowed_prefixes': ['sda1', 'rbd1']},
- "net-io": {"allowed_prefixes": ["virbr2"]}}}
-
-with start_monitoring(uri, monitor_config) as queue:
- while True:
- try:
- (ip, port), data = queue.get(True, 1)
- print (ip, port), data
- # add_data(conn, ip, [data])
- except Empty:
- pass
diff --git a/wally/sensors/sensors/io_sensors.py b/wally/sensors/sensors/io_sensors.py
index c9ff340..e4049a9 100644
--- a/wally/sensors/sensors/io_sensors.py
+++ b/wally/sensors/sensors/io_sensors.py
@@ -38,6 +38,8 @@
dev_ok = is_dev_accepted(dev_name,
disallowed_prefixes,
allowed_prefixes)
+ if dev_name[-1].isdigit():
+ dev_ok = False
if dev_ok:
for pos, name, accum_val in io_values_pos:
diff --git a/wally/sensors/sensors/net_sensors.py b/wally/sensors/sensors/net_sensors.py
index 4a4e477..2723499 100644
--- a/wally/sensors/sensors/net_sensors.py
+++ b/wally/sensors/sensors/net_sensors.py
@@ -25,7 +25,7 @@
@provides("net-io")
-def net_stat(disallowed_prefixes=('docker',), allowed_prefixes=None):
+def net_stat(disallowed_prefixes=('docker', 'lo'), allowed_prefixes=('eth',)):
results = {}
for line in open('/proc/net/dev').readlines()[2:]:
@@ -36,6 +36,10 @@
dev_ok = is_dev_accepted(dev_name,
disallowed_prefixes,
allowed_prefixes)
+
+ if '.' in dev_name and dev_name.split('.')[-1].isdigit():
+ dev_ok = False
+
if dev_ok:
for pos, name, accum_val in net_values_pos:
sensor_name = "{0}.{1}".format(dev_name, name)
diff --git a/wally/sensors/sensors/psram_sensors.py b/wally/sensors/sensors/psram_sensors.py
index 7e4f8b9..faac87d 100644
--- a/wally/sensors/sensors/psram_sensors.py
+++ b/wally/sensors/sensors/psram_sensors.py
@@ -46,7 +46,6 @@
def psram_stat(disallowed_prefixes=None, allowed_prefixes=None):
results = {}
pid_list = get_pid_list(disallowed_prefixes, allowed_prefixes)
- print pid_list
for pid in pid_list:
try:
dev_name = get_pid_name(pid)
diff --git a/wally/sensors/sensors/syscpu_sensors.py b/wally/sensors/sensors/syscpu_sensors.py
index 3324f67..3f6cf44 100644
--- a/wally/sensors/sensors/syscpu_sensors.py
+++ b/wally/sensors/sensors/syscpu_sensors.py
@@ -1,5 +1,5 @@
+from .utils import SensorInfo
from ..discover import provides
-from .utils import SensorInfo, is_dev_accepted
# 0 - cpu name
# 1 - user: normal processes executing in user mode
@@ -17,32 +17,27 @@
(4, 'idle_time', True),
]
-# extended values, on 1 pos in line
-cpu_extvalues = ['procs_blocked']
-
@provides("system-cpu")
-def syscpu_stat(disallowed_prefixes=('intr', 'ctxt', 'btime', 'processes',
- 'procs_running', 'softirq'),
- allowed_prefixes=None):
+def syscpu_stat(disallowed_prefixes=None, allowed_prefixes=None):
results = {}
+ # calculate core count
+ core_count = 0
+
for line in open('/proc/stat'):
vals = line.split()
dev_name = vals[0]
- dev_ok = is_dev_accepted(dev_name,
- disallowed_prefixes,
- allowed_prefixes)
+ if dev_name == 'cpu':
+ for pos, name, accum_val in io_values_pos:
+ sensor_name = "{0}.{1}".format(dev_name, name)
+ results[sensor_name] = SensorInfo(int(vals[pos]),
+ accum_val)
+ elif dev_name == 'procs_blocked':
+ val = int(vals[1]) // core_count
+ results["cpu.procs_blocked"] = SensorInfo(val, False)
+ elif dev_name.startswith('cpu'):
+ core_count += 1
- if dev_ok:
- if dev_name in cpu_extvalues:
- # for single values
- sensor_name = "cpu.{0}".format(dev_name)
- results[sensor_name] = SensorInfo(int(vals[1]), False)
- else:
- for pos, name, accum_val in io_values_pos:
- sensor_name = "{0}.{1}".format(dev_name, name)
- results[sensor_name] = SensorInfo(int(vals[pos]), accum_val)
return results
-
diff --git a/wally/sensors/sensors/sysram_sensors.py b/wally/sensors/sensors/sysram_sensors.py
index 8df7acc..2eacf44 100644
--- a/wally/sensors/sensors/sysram_sensors.py
+++ b/wally/sensors/sensors/sysram_sensors.py
@@ -39,5 +39,3 @@
usage = used / results['ram.MemTotal'].value
results["ram.usage_percent"] = SensorInfo(usage, False)
return results
-
-print sysram_stat()
diff --git a/wally/sensors_utils.py b/wally/sensors_utils.py
index 20135b6..81a2832 100644
--- a/wally/sensors_utils.py
+++ b/wally/sensors_utils.py
@@ -26,21 +26,19 @@
break
addr, data = val
-
if addr not in observed_nodes:
mon_q.put(addr + (data['source_id'],))
observed_nodes.add(addr)
fd.write(repr((addr, data)) + "\n")
- source_id = data.pop('source_id')
- rep_time = data.pop('time')
-
- if 'testnode' in source2roles_map.get(source_id, []):
- sum_io_q = 0
- data_store.update_values(rep_time,
- {"testnodes:io": sum_io_q},
- add=True)
+ # source_id = data.pop('source_id')
+ # rep_time = data.pop('time')
+ # if 'testnode' in source2roles_map.get(source_id, []):
+ # sum_io_q = 0
+ # data_store.update_values(rep_time,
+ # {"testnodes:io": sum_io_q},
+ # add=True)
except Exception:
logger.exception("Error in sensors thread")
logger.info("Sensors thread exits")
@@ -62,8 +60,8 @@
for node in nodes:
if role in node.roles:
- if node.monitor_url is not None:
- monitor_url = node.monitor_url
+ if node.monitor_ip is not None:
+ monitor_url = receiver_url.format(ip=node.monitor_ip)
else:
ip = node.get_ip()
ext_ip = utils.get_ip_for_target(ip)
@@ -104,7 +102,8 @@
return mon_q
-def deploy_sensors_stage(cfg, ctx, nodes=None, undeploy=True):
+def deploy_sensors_stage(cfg, ctx, nodes=None, undeploy=True,
+ recv_timeout=10, ignore_nodata=False):
if 'sensors' not in cfg:
return
@@ -134,18 +133,22 @@
ctx.clear_calls_stack.append(remove_sensors_stage)
- logger.info("Deploing new sensors on {0} node(s)".format(len(nodes)))
+ num_monitoref_nodes = len(sensors_configs)
+ logger.info("Deploing new sensors on {0} node(s)".format(
+ num_monitoref_nodes))
+
deploy_and_start_sensors(sensors_configs)
- wait_for_new_sensors_data(ctx, monitored_nodes)
+ wait_for_new_sensors_data(ctx, monitored_nodes, recv_timeout,
+ ignore_nodata)
-def wait_for_new_sensors_data(ctx, monitored_nodes):
- MAX_WAIT_FOR_SENSORS = 10
- etime = time.time() + MAX_WAIT_FOR_SENSORS
+def wait_for_new_sensors_data(ctx, monitored_nodes, recv_timeout,
+ ignore_nodata):
+ etime = time.time() + recv_timeout
msg = "Waiting at most {0}s till all {1} nodes starts report sensors data"
nodes_ids = set(node.get_conn_id() for node in monitored_nodes)
- logger.debug(msg.format(MAX_WAIT_FOR_SENSORS, len(nodes_ids)))
+ logger.debug(msg.format(recv_timeout, len(nodes_ids)))
# wait till all nodes start sending data
while len(nodes_ids) != 0:
@@ -153,9 +156,12 @@
try:
source_id = ctx.sensors_mon_q.get(True, tleft)[2]
except Queue.Empty:
- msg = "Node {0} not sending any sensor data in {1}s"
- msg = msg.format(", ".join(nodes_ids), MAX_WAIT_FOR_SENSORS)
- raise RuntimeError(msg)
+ if not ignore_nodata:
+ msg = "Node(s) {0} not sending any sensor data in {1}s"
+ msg = msg.format(", ".join(nodes_ids), recv_timeout)
+ raise RuntimeError(msg)
+ else:
+ return
if source_id not in nodes_ids:
msg = "Receive sensors from extra node: {0}".format(source_id)
diff --git a/wally/ssh_utils.py b/wally/ssh_utils.py
index 7a37c4b..7b6d593 100644
--- a/wally/ssh_utils.py
+++ b/wally/ssh_utils.py
@@ -5,6 +5,7 @@
import logging
import os.path
import getpass
+import StringIO
import threading
import subprocess
@@ -64,6 +65,15 @@
return False
+NODE_KEYS = {}
+
+
+def set_key_for_node(host_port, key):
+ sio = StringIO.StringIO(key)
+ NODE_KEYS[host_port] = paramiko.RSAKey.from_private_key(sio)
+ sio.close()
+
+
def ssh_connect(creds, conn_timeout=60):
if creds == 'local':
return Local
@@ -101,6 +111,14 @@
look_for_keys=False,
port=creds.port,
banner_timeout=c_banner_timeout)
+ elif (creds.host, creds.port) in NODE_KEYS:
+ ssh.connect(creds.host,
+ username=creds.user,
+ timeout=c_tcp_timeout,
+ pkey=NODE_KEYS[(creds.host, creds.port)],
+ look_for_keys=False,
+ port=creds.port,
+ banner_timeout=c_banner_timeout)
else:
key_file = os.path.expanduser('~/.ssh/id_rsa')
ssh.connect(creds.host,
diff --git a/wally/start_vms.py b/wally/start_vms.py
index 6ce91f8..4e0698c 100644
--- a/wally/start_vms.py
+++ b/wally/start_vms.py
@@ -109,6 +109,16 @@
time.sleep(10)
+def find_vms(nova, name_prefix):
+ for srv in nova.servers.list():
+ if srv.name.startswith(name_prefix):
+ for ips in srv.addresses.values():
+ for ip in ips:
+ if ip.get("OS-EXT-IPS:type", None) == 'floating':
+ yield ip['addr']
+ break
+
+
def prepare_os(nova, params):
allow_ssh(nova, params['security_group'])
@@ -368,13 +378,15 @@
logger.debug(msg.format(srv))
nova.servers.delete(srv)
- for j in range(120):
- all_id = set(alive_srv.id for alive_srv in nova.servers.list())
- if srv.id not in all_id:
- break
- time.sleep(1)
- else:
- raise RuntimeError("Server {0} delete timeout".format(srv.id))
+ try:
+ for j in range(120):
+ srv = nova.servers.get(srv.id)
+ time.sleep(1)
+ else:
+ msg = "Server {0} delete timeout".format(srv.id)
+ raise RuntimeError(msg)
+ except NotFound:
+ pass
else:
break
else:
diff --git a/wally/statistic.py b/wally/statistic.py
index a02033d..5a9d163 100644
--- a/wally/statistic.py
+++ b/wally/statistic.py
@@ -2,9 +2,11 @@
import itertools
try:
+ from scipy import stats
from numpy import array, linalg
from scipy.optimize import leastsq
from numpy.polynomial.chebyshev import chebfit, chebval
+ no_numpy = False
except ImportError:
no_numpy = True
@@ -19,12 +21,6 @@
return round_deviation((val, val / 10.0))[0]
-def round_deviation_p(med_dev):
- med, dev = med_dev
- med, dev = round_deviation((med, med * dev))
- return [med, float(dev) / med]
-
-
def round_deviation(med_dev):
med, dev = med_dev
@@ -136,3 +132,46 @@
should returns amount of measurements to get results (avg and deviation)
with error less, that max_diff in at least req_probability% cases
"""
+
+
+class StatProps(object):
+ def __init__(self):
+ self.average = None
+ self.mediana = None
+ self.perc_95 = None
+ self.perc_5 = None
+ self.deviation = None
+ self.confidence = None
+ self.min = None
+ self.max = None
+
+ def rounded_average_conf(self):
+ return round_deviation((self.average, self.confidence))
+
+
+def data_property(data, confidence=0.95):
+ res = StatProps()
+ if len(data) == 0:
+ return res
+
+ data = sorted(data)
+ res.average, res.deviation = med_dev(data)
+ res.max = data[-1]
+ res.min = data[0]
+
+ ln = len(data)
+ if ln % 2 == 0:
+ res.mediana = (data[ln / 2] + data[ln / 2 - 1]) / 2
+ else:
+ res.mediana = data[ln / 2]
+
+ res.perc_95 = data[int((ln - 1) * 0.95)]
+ res.perc_5 = data[int((ln - 1) * 0.05)]
+
+ if not no_numpy and ln >= 3:
+ res.confidence = stats.sem(data) * \
+ stats.t.ppf((1 + confidence) / 2, ln - 1)
+ else:
+ res.confidence = res.deviation
+
+ return res
diff --git a/wally/suits/io/agent.py b/wally/suits/io/agent.py
index 51eb2fd..f6c3308 100644
--- a/wally/suits/io/agent.py
+++ b/wally/suits/io/agent.py
@@ -373,8 +373,10 @@
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
+ start_time = time.time()
# set timeout
raw_out, raw_err = p.communicate(benchmark_config)
+ end_time = time.time()
# HACK
raw_out = "{" + raw_out.split('{', 1)[1]
@@ -395,7 +397,7 @@
raw_out = raw_out[:100]
raise ValueError(msg.format(raw_out, exc))
- return zip(parsed_out, config_slice)
+ return zip(parsed_out, config_slice), (start_time, end_time)
def add_job_results(section, job_output, res):
@@ -445,13 +447,16 @@
curr_test_num = 0
executed_tests = 0
result = {}
+ timings = []
for i, test_slice in enumerate(sliced_list):
- res_cfg_it = do_run_fio(test_slice)
+ res_cfg_it, slice_timings = do_run_fio(test_slice)
res_cfg_it = enumerate(res_cfg_it, curr_test_num)
+ section_names = []
for curr_test_num, (job_output, section) in res_cfg_it:
executed_tests += 1
+ section_names.append(section.name)
if raw_results_func is not None:
raw_results_func(executed_tests,
@@ -465,6 +470,7 @@
add_job_results(section, job_output, result)
+ timings.append((section_names, slice_timings))
curr_test_num += 1
msg_template = "Done {0} tests from {1}. ETA: {2}"
@@ -475,7 +481,7 @@
test_left,
sec_to_str(time_eta))
- return result, executed_tests
+ return result, executed_tests, timings
def run_benchmark(binary_tp, *argv, **kwargs):
@@ -605,11 +611,13 @@
rrfunc = raw_res_func if argv_obj.show_raw_results else None
stime = time.time()
- job_res, num_tests = run_benchmark(argv_obj.type,
- sliced_it, rrfunc)
+ job_res, num_tests, timings = run_benchmark(argv_obj.type,
+ sliced_it, rrfunc)
etime = time.time()
- res = {'__meta__': {'raw_cfg': job_cfg, 'params': params},
+ res = {'__meta__': {'raw_cfg': job_cfg,
+ 'params': params,
+ 'timings': timings},
'res': job_res}
oformat = 'json' if argv_obj.json else 'eval'
diff --git a/wally/suits/io/ceph.cfg b/wally/suits/io/ceph.cfg
index 425696a..5593181 100644
--- a/wally/suits/io/ceph.cfg
+++ b/wally/suits/io/ceph.cfg
@@ -12,8 +12,8 @@
NUMJOBS_SHORT={% 1, 2, 3, 10 %}
size=30G
-ramp_time=5
-runtime=30
+ramp_time=15
+runtime=60
# ---------------------------------------------------------------------
# check different thread count, sync mode. (latency, iops) = func(th_count)
diff --git a/wally/suits/io/formatter.py b/wally/suits/io/formatter.py
index 7fbe70b..09565be 100644
--- a/wally/suits/io/formatter.py
+++ b/wally/suits/io/formatter.py
@@ -1,33 +1,34 @@
import texttable
-from wally.utils import ssize_to_b
+from wally.utils import ssize2b
+from wally.statistic import round_3_digit
from wally.suits.io.agent import get_test_summary
-from wally.statistic import med_dev, round_deviation, round_3_digit
def key_func(k_data):
- _, data = k_data
-
+ name, data = k_data
return (data['rw'],
data['sync_mode'],
- ssize_to_b(data['blocksize']),
- data['concurence'])
+ ssize2b(data['blocksize']),
+ data['concurence'],
+ name)
-def format_results_for_console(test_set):
+def format_results_for_console(test_set, dinfo):
"""
create a table with io performance report
for console
"""
tab = texttable.Texttable(max_width=120)
tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
- tab.set_cols_align(["l", "r", "r", "r", "r", "r", "r"])
+ tab.set_cols_align(["l", "l", "r", "r", "r", "r", "r", "r"])
+
+ items = sorted(test_set['res'].items(), key=key_func)
prev_k = None
vm_count = test_set['__test_meta__']['testnodes_count']
- items = sorted(test_set['res'].items(), key=key_func)
- header = ["Description", "iops\ncum", "KiBps\ncum",
- "iops\nper vm", "KiBps\nper vm", "Cnf\n%", "lat\nms"]
+ header = ["Name", "Description", "iops\ncum", "KiBps\ncum",
+ "Cnf\n95%", "iops\nper vm", "KiBps\nper vm", "lat\nms"]
for test_name, data in items:
@@ -36,34 +37,30 @@
if prev_k is not None:
if prev_k != curr_k:
tab.add_row(
- ["--------", "-----", "------",
- "-----", "------", "---", "-----"])
+ ["-------", "--------", "-----", "------",
+ "---", "------", "---", "-----"])
prev_k = curr_k
descr = get_test_summary(data)
+ test_dinfo = dinfo[test_name]
- iops, _ = round_deviation(med_dev(data['iops']))
- bw, bwdev = round_deviation(med_dev(data['bw']))
+ iops, _ = test_dinfo.iops.rounded_average_conf()
+ bw, bw_conf = test_dinfo.bw.rounded_average_conf()
+ conf_perc = int(round(bw_conf * 100 / bw))
- # 3 * sigma
- if 0 == bw:
- assert 0 == bwdev
- dev_perc = 0
- else:
- dev_perc = int((bwdev * 300) / bw)
+ lat, _ = test_dinfo.lat.rounded_average_conf()
+ lat = round_3_digit(int(lat) // 1000)
- med_lat, _ = round_deviation(med_dev(data['lat']))
- med_lat = int(med_lat) // 1000
+ iops_per_vm = round_3_digit(iops / float(vm_count))
+ bw_per_vm = round_3_digit(bw / float(vm_count))
iops = round_3_digit(iops)
bw = round_3_digit(bw)
- iops_cum = round_3_digit(iops * vm_count)
- bw_cum = round_3_digit(bw * vm_count)
- med_lat = round_3_digit(med_lat)
- params = (descr, int(iops_cum), int(bw_cum),
- int(iops), int(bw), dev_perc, med_lat)
+ params = (test_name.split('_', 1)[0],
+ descr, int(iops), int(bw), str(conf_perc),
+ int(iops_per_vm), int(bw_per_vm), lat)
tab.add_row(params)
tab.header(header)
diff --git a/wally/suits/io/results_loader.py b/wally/suits/io/results_loader.py
index 4dff186..988fe0e 100644
--- a/wally/suits/io/results_loader.py
+++ b/wally/suits/io/results_loader.py
@@ -1,47 +1,5 @@
import re
import json
-import collections
-
-
-# from wally.utils import ssize_to_b
-from wally.statistic import med_dev
-
-PerfInfo = collections.namedtuple('PerfInfo',
- ('name',
- 'bw', 'iops', 'dev',
- 'lat', 'lat_dev', 'raw',
- 'meta'))
-
-
-def split_and_add(data, block_size):
- assert len(data) % block_size == 0
- res = [0] * block_size
-
- for idx, val in enumerate(data):
- res[idx % block_size] += val
-
- return res
-
-
-def process_disk_info(test_output):
- data = {}
- for tp, pre_result in test_output:
- if tp != 'io' or pre_result is None:
- pass
-
- vm_count = pre_result['__test_meta__']['testnodes_count']
- for name, results in pre_result['res'].items():
- assert len(results['bw']) % vm_count == 0
- block_count = len(results['bw']) // vm_count
-
- bw, bw_dev = med_dev(split_and_add(results['bw'], block_count))
- iops, _ = med_dev(split_and_add(results['iops'],
- block_count))
- lat, lat_dev = med_dev(results['lat'])
- dev = bw_dev / float(bw)
- data[name] = PerfInfo(name, bw, iops, dev, lat, lat_dev, results,
- pre_result['__test_meta__'])
- return data
def parse_output(out_err):
@@ -96,21 +54,3 @@
else:
yield map(result.raw.get, fields_to_select)
return closure
-
-
-# def load_data(raw_data):
-# data = list(parse_output(raw_data))[0]
-
-# for key, val in data['res'].items():
-# val['blocksize_b'] = ssize_to_b(val['blocksize'])
-
-# val['iops_mediana'], val['iops_stddev'] = med_dev(val['iops'])
-# val['bw_mediana'], val['bw_stddev'] = med_dev(val['bw'])
-# val['lat_mediana'], val['lat_stddev'] = med_dev(val['lat'])
-# yield val
-
-
-# def load_files(*fnames):
-# for fname in fnames:
-# for i in load_data(open(fname).read()):
-# yield i
diff --git a/wally/suits/io/rrd.cfg b/wally/suits/io/rrd.cfg
new file mode 100644
index 0000000..5593181
--- /dev/null
+++ b/wally/suits/io/rrd.cfg
@@ -0,0 +1,55 @@
+[defaults]
+wait_for_previous
+group_reporting
+time_based
+buffered=0
+iodepth=1
+softrandommap=1
+filename={FILENAME}
+NUM_ROUNDS=7
+
+NUMJOBS={% 1, 5, 10, 15, 40 %}
+NUMJOBS_SHORT={% 1, 2, 3, 10 %}
+
+size=30G
+ramp_time=15
+runtime=60
+
+# ---------------------------------------------------------------------
+# check different thread count, sync mode. (latency, iops) = func(th_count)
+# ---------------------------------------------------------------------
+[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=4k
+rw=randwrite
+sync=1
+numjobs={NUMJOBS}
+
+# ---------------------------------------------------------------------
+# direct write
+# ---------------------------------------------------------------------
+[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=4k
+rw=randwrite
+direct=1
+numjobs=1
+
+# ---------------------------------------------------------------------
+# check different thread count, direct read mode. (latency, iops) = func(th_count)
+# also check iops for randread
+# ---------------------------------------------------------------------
+[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=4k
+rw=randread
+direct=1
+numjobs={NUMJOBS}
+
+# ---------------------------------------------------------------------
+# this is essentially sequential write/read operations
+# we can't use sequential with numjobs > 1 due to caching and block merging
+# ---------------------------------------------------------------------
+[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=16m
+rw={% randread, randwrite %}
+direct=1
+numjobs={NUMJOBS_SHORT}
+
diff --git a/wally/suits/io/verify.cfg b/wally/suits/io/verify.cfg
new file mode 100644
index 0000000..4a66aac
--- /dev/null
+++ b/wally/suits/io/verify.cfg
@@ -0,0 +1,38 @@
+[defaults]
+wait_for_previous
+group_reporting
+time_based
+buffered=0
+iodepth=1
+softrandommap=1
+filename={FILENAME}
+NUM_ROUNDS=1
+
+size=5G
+ramp_time=5
+runtime=360
+
+# ---------------------------------------------------------------------
+# check different thread count, sync mode. (latency, iops) = func(th_count)
+# ---------------------------------------------------------------------
+[verify_{TEST_SUMM}]
+blocksize=4m
+rw=randread
+direct=1
+numjobs=5
+
+# ---------------------------------------------------------------------
+# check different thread count, sync mode. (latency, iops) = func(th_count)
+# ---------------------------------------------------------------------
+# [verify_{TEST_SUMM}]
+# blocksize=4k
+# rw=randwrite
+# direct=1
+
+# ---------------------------------------------------------------------
+# direct write
+# ---------------------------------------------------------------------
+# [verify_{TEST_SUMM}]
+# blocksize=4k
+# rw=randread
+# direct=1
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index db5dc36..f0a1e8d 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -9,7 +9,7 @@
from paramiko import SSHException, SFTPError
import texttable
-from wally.utils import (ssize_to_b, open_for_append_or_create,
+from wally.utils import (ssize2b, open_for_append_or_create,
sec_to_str, StopTestError)
from wally.ssh_utils import (copy_paths, run_over_ssh,
@@ -214,7 +214,7 @@
files = {}
for section in self.configs:
- sz = ssize_to_b(section.vals['size'])
+ sz = ssize2b(section.vals['size'])
msz = sz / (1024 ** 2)
if sz % (1024 ** 2) != 0:
@@ -356,8 +356,7 @@
end_of_wait_time = timeout + time.time()
soft_end_of_wait_time = soft_timeout + time.time()
- # time_till_check = random.randint(30, 90)
- time_till_check = 5
+ time_till_check = random.randint(5, 10)
pid = None
is_running = False
pid_get_timeout = self.max_pig_timeout + time.time()
@@ -484,7 +483,6 @@
soft_tout = exec_time
barrier.wait()
self.run_over_ssh(cmd, nolog=nolog)
-
if self.is_primary:
templ = "Test should takes about {0}." + \
" Should finish at {1}," + \
@@ -517,7 +515,8 @@
with self.node.connection.open_sftp() as sftp:
return read_from_remote(sftp, self.log_fl)
- def merge_results(self, results):
+ @classmethod
+ def merge_results(cls, results):
if len(results) == 0:
return None
@@ -526,9 +525,12 @@
mergable_fields = ['bw', 'clat', 'iops', 'lat', 'slat']
for res in results[1:]:
- assert res['__meta__'] == merged_result['__meta__']
- data = res['res']
+ mm = merged_result['__meta__']
+ assert mm['raw_cfg'] == res['__meta__']['raw_cfg']
+ assert mm['params'] == res['__meta__']['params']
+ mm['timings'].extend(res['__meta__']['timings'])
+ data = res['res']
for testname, test_data in data.items():
if testname not in merged_data:
merged_data[testname] = test_data
@@ -552,5 +554,5 @@
return merged_result
@classmethod
- def format_for_console(cls, data):
- return io_formatter.format_results_for_console(data)
+ def format_for_console(cls, data, dinfo):
+ return io_formatter.format_results_for_console(data, dinfo)
diff --git a/wally/utils.py b/wally/utils.py
index 8603f58..d5d6f48 100644
--- a/wally/utils.py
+++ b/wally/utils.py
@@ -23,6 +23,18 @@
return True
+class StopTestError(RuntimeError):
+ def __init__(self, reason, orig_exc=None):
+ RuntimeError.__init__(self, reason)
+ self.orig_exc = orig_exc
+
+
+def check_input_param(is_ok, message):
+ if not is_ok:
+ logger.error(message)
+ raise StopTestError(message)
+
+
def parse_creds(creds):
# parse user:passwd@host
user, passwd_host = creds.split(":", 1)
@@ -39,12 +51,6 @@
pass
-class StopTestError(RuntimeError):
- def __init__(self, reason, orig_exc=None):
- RuntimeError.__init__(self, reason)
- self.orig_exc = orig_exc
-
-
class Barrier(object):
def __init__(self, count):
self.count = count
@@ -90,7 +96,7 @@
SMAP = dict(k=1024, m=1024 ** 2, g=1024 ** 3, t=1024 ** 4)
-def ssize_to_b(ssize):
+def ssize2b(ssize):
try:
if isinstance(ssize, (int, long)):
return ssize
@@ -103,6 +109,26 @@
raise ValueError("Unknow size format {0!r}".format(ssize))
+RSMAP = [('K', 1024),
+ ('M', 1024 ** 2),
+ ('G', 1024 ** 3),
+ ('T', 1024 ** 4)]
+
+
+def b2ssize(size):
+ if size < 1024:
+ return str(size)
+
+ for name, scale in RSMAP:
+ if size < 1024 * scale:
+ if size % scale == 0:
+ return "{0} {1}i".format(size // scale, name)
+ else:
+ return "{0:.1f} {1}i".format(float(size) / scale, name)
+
+ return "{0}{1}i".format(size // scale, name)
+
+
def get_ip_for_target(target_ip):
if not is_ip(target_ip):
target_ip = socket.gethostbyname(target_ip)
@@ -165,3 +191,15 @@
return res
return data
+
+
+CLEANING = []
+
+
+def clean_resource(func, *args, **kwargs):
+ CLEANING.append((func, args, kwargs))
+
+
+def iter_clean_func():
+ while CLEANING != []:
+ yield CLEANING.pop()