very large refactoring
diff --git a/wally/__init__.py b/wally/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/wally/__init__.py
diff --git a/wally/__main__.py b/wally/__main__.py
new file mode 100644
index 0000000..97011d9
--- /dev/null
+++ b/wally/__main__.py
@@ -0,0 +1,6 @@
+import sys
+from .run_test import main
+
+
+if __name__ == '__main__':
+ exit(main(sys.argv))
diff --git a/wally/assumptions_check.py b/wally/assumptions_check.py
new file mode 100644
index 0000000..41ae2e0
--- /dev/null
+++ b/wally/assumptions_check.py
@@ -0,0 +1,170 @@
+import sys
+
+import texttable as TT
+
+import numpy as np
+import matplotlib.pyplot as plt
+from numpy.polynomial.chebyshev import chebfit, chebval
+
+from .io_results_loader import load_data, filter_data
+from .statistic import approximate_line, difference
+
+
+def linearity_plot(data, types, vals=None):
+ fields = 'blocksize_b', 'iops_mediana', 'iops_stddev'
+
+ names = {}
+ for tp1 in ('rand', 'seq'):
+ for oper in ('read', 'write'):
+ for sync in ('sync', 'direct', 'async'):
+ sq = (tp1, oper, sync)
+ name = "{0} {1} {2}".format(*sq)
+ names["".join(word[0] for word in sq)] = name
+
+ colors = ['red', 'green', 'blue', 'cyan',
+ 'magenta', 'black', 'yellow', 'burlywood']
+ markers = ['*', '^', 'x', 'o', '+', '.']
+ color = 0
+ marker = 0
+
+ for tp in types:
+ filtered_data = filter_data('linearity_test_' + tp, fields)
+ x = []
+ y = []
+ e = []
+ # values to make line
+ ax = []
+ ay = []
+
+ for sz, med, dev in sorted(filtered_data(data)):
+ iotime_ms = 1000. // med
+ iotime_max = 1000. // (med - dev * 3)
+
+ x.append(sz / 1024.0)
+ y.append(iotime_ms)
+ e.append(iotime_max - iotime_ms)
+ if vals is None or sz in vals:
+ ax.append(sz / 1024.0)
+ ay.append(iotime_ms)
+
+ plt.errorbar(x, y, e, linestyle='None', label=names[tp],
+ color=colors[color], ecolor="black",
+ marker=markers[marker])
+ ynew = approximate_line(ax, ay, ax, True)
+ plt.plot(ax, ynew, color=colors[color])
+ color += 1
+ marker += 1
+ plt.legend(loc=2)
+ plt.title("Linearity test by %i dots" % (len(vals)))
+
+
+def linearity_table(data, types, vals):
+ """ create table by pyplot with diferences
+ between original and approximated
+ vals - values to make line"""
+ fields = 'blocksize_b', 'iops_mediana'
+ for tp in types:
+ filtered_data = filter_data('linearity_test_' + tp, fields)
+ # all values
+ x = []
+ y = []
+ # values to make line
+ ax = []
+ ay = []
+
+ for sz, med in sorted(filtered_data(data)):
+ iotime_ms = 1000. // med
+ x.append(sz / 1024.0)
+ y.append(iotime_ms)
+ if sz in vals:
+ ax.append(sz / 1024.0)
+ ay.append(iotime_ms)
+
+ ynew = approximate_line(ax, ay, x, True)
+
+ dif, _, _ = difference(y, ynew)
+ table_data = []
+ for i, d in zip(x, dif):
+ row = ["{0:.1f}".format(i), "{0:.1f}".format(d[0]), "{0:.0f}".format(d[1]*100)]
+ table_data.append(row)
+
+ tab = TT.Texttable()
+ tab.set_deco(tab.VLINES)
+
+ header = ["BlockSize, kB", "Absolute difference (ms)", "Relative difference (%)"]
+ tab.add_row(header)
+ tab.header = header
+
+ for row in table_data:
+ tab.add_row(row)
+
+ # uncomment to get table in pretty pictures :)
+ # colLabels = ("BlockSize, kB", "Absolute difference (ms)", "Relative difference (%)")
+ # fig = plt.figure()
+ # ax = fig.add_subplot(111)
+ # ax.axis('off')
+ # #do the table
+ # the_table = ax.table(cellText=table_data,
+ # colLabels=colLabels,
+ # loc='center')
+ # plt.savefig(tp+".png")
+
+
+def th_plot(data, tt):
+ fields = 'concurence', 'iops_mediana', 'lat_mediana'
+ conc_4k = filter_data('concurrence_test_' + tt, fields, blocksize='4k')
+ filtered_data = sorted(list(conc_4k(data)))
+
+ x, iops, lat = zip(*filtered_data)
+
+ _, ax1 = plt.subplots()
+
+ xnew = np.linspace(min(x), max(x), 50)
+ # plt.plot(xnew, power_smooth, 'b-', label='iops')
+ ax1.plot(x, iops, 'b*')
+
+ for degree in (3,):
+ c = chebfit(x, iops, degree)
+ vals = chebval(xnew, c)
+ ax1.plot(xnew, vals, 'g--')
+
+ # ax1.set_xlabel('thread count')
+ # ax1.set_ylabel('iops')
+
+ # ax2 = ax1.twinx()
+ # lat = [i / 1000 for i in lat]
+ # ax2.plot(x, lat, 'r*')
+
+ # tck = splrep(x, lat, s=0.0)
+ # power_smooth = splev(xnew, tck)
+ # ax2.plot(xnew, power_smooth, 'r-', label='lat')
+
+ # xp = xnew[0]
+ # yp = power_smooth[0]
+ # for _x, _y in zip(xnew[1:], power_smooth[1:]):
+ # if _y >= 100:
+ # xres = (_y - 100.) / (_y - yp) * (_x - xp) + xp
+ # ax2.plot([xres, xres], [min(power_smooth), max(power_smooth)], 'g--')
+ # break
+ # xp = _x
+ # yp = _y
+
+ # ax2.plot([min(x), max(x)], [20, 20], 'g--')
+ # ax2.plot([min(x), max(x)], [100, 100], 'g--')
+
+ # ax2.set_ylabel("lat ms")
+ # plt.legend(loc=2)
+
+
+def main(argv):
+ data = list(load_data(open(argv[1]).read()))
+ linearity_table(data, ["rwd", "rws", "rrd"], [4096, 4096*1024])
+ # linearity_plot(data, ["rwd", "rws", "rrd"])#, [4096, 4096*1024])
+ # linearity_plot(data, ["rws", "rwd"])
+ # th_plot(data, 'rws')
+ # th_plot(data, 'rrs')
+ plt.show()
+
+
+if __name__ == "__main__":
+ exit(main(sys.argv))
diff --git a/wally/charts.py b/wally/charts.py
new file mode 100644
index 0000000..a168f9d
--- /dev/null
+++ b/wally/charts.py
@@ -0,0 +1,145 @@
+import os
+import sys
+import hashlib
+
+from GChartWrapper import Line
+from GChartWrapper import constants
+from GChartWrapper import VerticalBarGroup
+
+from config import cfg_dict
+
+
+# Patch MARKER constant
+constants.MARKERS += 'E'
+sys.modules['GChartWrapper.GChart'].MARKERS += 'E'
+
+
+COLORS = ["1569C7", "81D8D0", "307D7E", "5CB3FF", "0040FF", "81DAF5"]
+constants.MARKERS += 'E' # append E marker to available markers
+
+
+def get_top_top_dir(path):
+ top_top_dir = os.path.dirname(os.path.dirname(path))
+ return path[len(top_top_dir) + 1:]
+
+
+def render_vertical_bar(title, legend, bars_data, bars_dev_top,
+ bars_dev_bottom, file_name,
+ width=700, height=400,
+ scale_x=None, scale_y=None, label_x=None,
+ label_y=None, lines=()):
+ """
+ Renders vertical bar group chart
+
+ :param legend - list of legend values.
+ Example: ['bar1', 'bar2', 'bar3']
+ :param dataset - list of values for each type (value, deviation)
+ Example:
+ [
+ [(10,1), (11, 2), (10,1)], # bar1 values
+ [(30,(29,33)),(35,(33,36)), (30,(29,33))], # bar2 values
+ [(20,(19,21)),(20,(13, 24)), (20,(19,21))] # bar 3 values
+ ]
+ :param width - width of chart
+ :param height - height of chart
+ :param scale_x - x ace scale
+ :param scale_y - y ace scale
+
+ :returns url to chart
+
+ dataset example:
+ {
+ 'relese_1': {
+ 'randr': (1, 0.1),
+ 'randwr': (2, 0.2)
+ }
+ 'release_2': {
+ 'randr': (3, 0.3),
+ 'randwr': (4, 0.4)
+ }
+ }
+ """
+
+ bar = VerticalBarGroup([], encoding='text')
+ bar.title(title)
+
+ dataset = bars_data + bars_dev_top + bars_dev_bottom + \
+ [l[0] for l in lines]
+
+ bar.dataset(dataset, series=len(bars_data))
+ bar.axes.type('xyy')
+ bar.axes.label(2, None, label_x)
+
+ if scale_x:
+ bar.axes.label(0, *scale_x)
+
+ max_value = (max([max(l) for l in dataset[:2]]))
+ bar.axes.range(1, 0, max_value)
+ bar.axes.style(1, 'N*s*')
+ bar.axes.style(2, '000000', '13')
+
+ bar.scale(*[0, max_value] * 3)
+
+ bar.bar('r', '.1', '1')
+ for i in range(1):
+ bar.marker('E', '000000', '%s:%s' % ((len(bars_data) + i*2), i),
+ '', '1:10')
+ bar.color(*COLORS)
+ bar.size(width, height)
+
+ axes_type = "xyy"
+
+ scale = [0, max_value] * len(bars_dev_top + bars_dev_bottom + bars_data)
+ if lines:
+ line_n = 0
+ for data, label, axe, leg in lines:
+ bar.marker('D', COLORS[len(bars_data) + line_n],
+ (len(bars_data + bars_dev_top + bars_dev_bottom))
+ + line_n, 0, 3)
+ # max_val_l = max(data)
+ if axe:
+ max_val_l = max(data)
+ bar.axes.type(axes_type + axe)
+ bar.axes.range(len(axes_type), 0, max_val_l)
+ bar.axes.style(len(axes_type), 'N*s*')
+ bar.axes.label(len(axes_type) + 1, None, label)
+ bar.axes.style(len(axes_type) + 1, '000000', '13')
+ axes_type += axe
+ line_n += 1
+ scale += [0, max_val_l]
+ else:
+ scale += [0, max_value]
+ legend.append(leg)
+ # scale += [0, max_val_l]
+
+ bar.legend(*legend)
+ bar.scale(*scale)
+ img_name = file_name + ".png"
+ img_path = os.path.join(cfg_dict['charts_img_path'], img_name)
+
+ if not os.path.exists(img_path):
+ bar.save(img_path)
+
+ return get_top_top_dir(img_path)
+
+
+def render_lines(title, legend, dataset, scale_x, width=700, height=400):
+ line = Line([], encoding="text")
+ line.title(title)
+ line.dataset(dataset)
+
+ line.axes('xy')
+ max_value = (max([max(l) for l in dataset]))
+ line.axes.range(1, 0, max_value)
+ line.scale(0, max_value)
+ line.axes.label(0, *scale_x)
+ line.legend(*legend)
+ line.color(*COLORS[:len(legend)])
+ line.size(width, height)
+
+ img_name = hashlib.md5(str(line)).hexdigest() + ".png"
+ img_path = os.path.join(cfg_dict['charts_img_path'], img_name)
+ if not os.path.exists(img_path):
+ line.save(img_path)
+
+ return get_top_top_dir(img_path)
diff --git a/wally/config.py b/wally/config.py
new file mode 100644
index 0000000..03b7ac9
--- /dev/null
+++ b/wally/config.py
@@ -0,0 +1,129 @@
+import os
+import uuid
+import logging
+import functools
+
+import yaml
+
+try:
+ from petname import Generate as pet_generate
+except ImportError:
+ def pet_generate(x, y):
+ return str(uuid.uuid4())
+
+
+cfg_dict = {}
+
+
+def mkdirs_if_unxists(path):
+ if not os.path.exists(path):
+ os.makedirs(path)
+
+
+def load_config(file_name, explicit_folder=None):
+ first_load = len(cfg_dict) == 0
+ cfg_dict.update(yaml.load(open(file_name).read()))
+
+ if first_load:
+ var_dir = cfg_dict.get('internal', {}).get('var_dir_root', '/tmp')
+
+ run_uuid = None
+ if explicit_folder is None:
+ for i in range(10):
+ run_uuid = pet_generate(2, "_")
+ results_dir = os.path.join(var_dir, run_uuid)
+ if not os.path.exists(results_dir):
+ break
+ else:
+ run_uuid = str(uuid.uuid4())
+ results_dir = os.path.join(var_dir, run_uuid)
+ else:
+ results_dir = explicit_folder
+
+ cfg_dict['var_dir'] = results_dir
+ cfg_dict['run_uuid'] = run_uuid.replace('_', '-')
+ mkdirs_if_unxists(cfg_dict['var_dir'])
+
+ in_var_dir = functools.partial(os.path.join, cfg_dict['var_dir'])
+
+ cfg_dict['charts_img_path'] = in_var_dir('charts')
+ mkdirs_if_unxists(cfg_dict['charts_img_path'])
+
+ cfg_dict['vm_ids_fname'] = in_var_dir('os_vm_ids')
+ cfg_dict['html_report_file'] = in_var_dir('report.html')
+ cfg_dict['text_report_file'] = in_var_dir('report.txt')
+ cfg_dict['log_file'] = in_var_dir('log.txt')
+ cfg_dict['sensor_storage'] = in_var_dir('sensor_storage.txt')
+
+ cfg_dict['test_log_directory'] = in_var_dir('test_logs')
+ mkdirs_if_unxists(cfg_dict['test_log_directory'])
+
+
+def color_me(color):
+ RESET_SEQ = "\033[0m"
+ COLOR_SEQ = "\033[1;%dm"
+
+ color_seq = COLOR_SEQ % (30 + color)
+
+ def closure(msg):
+ return color_seq + msg + RESET_SEQ
+ return closure
+
+
+class ColoredFormatter(logging.Formatter):
+ BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)
+
+ colors = {
+ 'WARNING': color_me(YELLOW),
+ 'DEBUG': color_me(BLUE),
+ 'CRITICAL': color_me(YELLOW),
+ 'ERROR': color_me(RED)
+ }
+
+ def __init__(self, msg, use_color=True, datefmt=None):
+ logging.Formatter.__init__(self, msg, datefmt=datefmt)
+ self.use_color = use_color
+
+ def format(self, record):
+ orig = record.__dict__
+ record.__dict__ = record.__dict__.copy()
+ levelname = record.levelname
+
+ prn_name = levelname + ' ' * (8 - len(levelname))
+ if levelname in self.colors:
+ record.levelname = self.colors[levelname](prn_name)
+ else:
+ record.levelname = prn_name
+
+ res = super(ColoredFormatter, self).format(record)
+
+ # restore record, as it will be used by other formatters
+ record.__dict__ = orig
+ return res
+
+
+def setup_loggers(def_level=logging.DEBUG, log_fname=None):
+ logger = logging.getLogger('wally')
+ logger.setLevel(logging.DEBUG)
+ sh = logging.StreamHandler()
+ sh.setLevel(def_level)
+
+ log_format = '%(asctime)s - %(levelname)s - %(name)s - %(message)s'
+ colored_formatter = ColoredFormatter(log_format, datefmt="%H:%M:%S")
+
+ sh.setFormatter(colored_formatter)
+ logger.addHandler(sh)
+
+ logger_api = logging.getLogger("wally.fuel_api")
+
+ if log_fname is not None:
+ fh = logging.FileHandler(log_fname)
+ log_format = '%(asctime)s - %(levelname)8s - %(name)s - %(message)s'
+ formatter = logging.Formatter(log_format, datefmt="%H:%M:%S")
+ fh.setFormatter(formatter)
+ fh.setLevel(logging.DEBUG)
+ logger.addHandler(fh)
+ logger_api.addHandler(fh)
+
+ logger_api.addHandler(sh)
+ logger_api.setLevel(logging.WARNING)
diff --git a/wally/discover/__init__.py b/wally/discover/__init__.py
new file mode 100644
index 0000000..b02809c
--- /dev/null
+++ b/wally/discover/__init__.py
@@ -0,0 +1,5 @@
+"this package contains node discovery code"
+from .discover import discover, undiscover
+from .node import Node
+
+__all__ = ["discover", "Node", "undiscover"]
diff --git a/wally/discover/ceph.py b/wally/discover/ceph.py
new file mode 100644
index 0000000..70a6edf
--- /dev/null
+++ b/wally/discover/ceph.py
@@ -0,0 +1,89 @@
+""" Collect data about ceph nodes"""
+import json
+import logging
+import subprocess
+
+
+from .node import Node
+from wally.ssh_utils import connect
+
+
+logger = logging.getLogger("io-perf-tool")
+
+
+def local_execute(cmd):
+ return subprocess.check_output(cmd, stderr=subprocess.STDOUT, shell=True)
+
+
+def ssh_execute(ssh):
+ def closure(cmd):
+ _, chan, _ = ssh.exec_command(cmd)
+ return chan.read()
+ return closure
+
+
+def discover_ceph_nodes(ip):
+ """ Return list of ceph's nodes ips """
+ ips = {}
+
+ if ip != 'local':
+ executor = ssh_execute(connect(ip))
+ else:
+ executor = local_execute
+
+ osd_ips = get_osds_ips(executor, get_osds_list(executor))
+ mon_ips = get_mons_or_mds_ips(executor, "mon")
+ mds_ips = get_mons_or_mds_ips(executor, "mds")
+
+ for ip in osd_ips:
+ url = "ssh://%s" % ip
+ ips.setdefault(url, []).append("ceph-osd")
+
+ for ip in mon_ips:
+ url = "ssh://%s" % ip
+ ips.setdefault(url, []).append("ceph-mon")
+
+ for ip in mds_ips:
+ url = "ssh://%s" % ip
+ ips.setdefault(url, []).append("ceph-mds")
+
+ return [Node(url, list(roles)) for url, roles in ips.items()]
+
+
+def get_osds_list(executor):
+ """ Get list of osds id"""
+ return filter(None, executor("ceph osd ls").split("\n"))
+
+
+def get_mons_or_mds_ips(executor, who):
+ """ Return mon ip list
+ :param who - "mon" or "mds" """
+ if who not in ("mon", "mds"):
+ raise ValueError(("'%s' in get_mons_or_mds_ips instead" +
+ "of mon/mds") % who)
+
+ line_res = executor("ceph {0} dump".format(who)).split("\n")
+
+ ips = set()
+ for line in line_res:
+ fields = line.split()
+
+ # what does fields[1], fields[2] means?
+ # make this code looks like:
+ # SOME_MENINGFULL_VAR1, SOME_MENINGFULL_VAR2 = line.split()[1:3]
+
+ if len(fields) > 2 and who in fields[2]:
+ ips.add(fields[1].split(":")[0])
+
+ return ips
+
+
+def get_osds_ips(executor, osd_list):
+ """ Get osd's ips
+ :param osd_list - list of osd names from osd ls command"""
+ ips = set()
+ for osd_id in osd_list:
+ out = executor("ceph osd find {0}".format(osd_id))
+ ip = json.loads(out)["ip"]
+ ips.add(str(ip.split(":")[0]))
+ return ips
diff --git a/wally/discover/discover.py b/wally/discover/discover.py
new file mode 100644
index 0000000..8c78387
--- /dev/null
+++ b/wally/discover/discover.py
@@ -0,0 +1,64 @@
+import logging
+
+from . import ceph
+from . import fuel
+from . import openstack
+from wally.utils import parse_creds
+
+
+logger = logging.getLogger("wally.discover")
+
+
+def discover(ctx, discover, clusters_info, var_dir):
+ nodes_to_run = []
+ clean_data = None
+ for cluster in discover:
+ if cluster == "openstack":
+ cluster_info = clusters_info["openstack"]
+ conn = cluster_info['connection']
+ user, passwd, tenant = parse_creds(conn['creds'])
+
+ auth_data = dict(
+ auth_url=conn['auth_url'],
+ username=user,
+ api_key=passwd,
+ project_id=tenant)
+
+ if not conn:
+ logger.error("No connection provided for %s. Skipping"
+ % cluster)
+ continue
+
+ logger.debug("Discovering openstack nodes "
+ "with connection details: %r" %
+ conn)
+
+ os_nodes = openstack.discover_openstack_nodes(auth_data,
+ cluster_info)
+ nodes_to_run.extend(os_nodes)
+
+ elif cluster == "fuel":
+
+ res = fuel.discover_fuel_nodes(clusters_info['fuel'], var_dir)
+ nodes, clean_data, openrc_dict = res
+
+ ctx.fuel_openstack_creds = {'name': openrc_dict['username'],
+ 'passwd': openrc_dict['password'],
+ 'tenant': openrc_dict['tenant_name'],
+ 'auth_url': openrc_dict['os_auth_url']}
+
+ nodes_to_run.extend(nodes)
+
+ elif cluster == "ceph":
+ cluster_info = clusters_info["ceph"]
+ nodes_to_run.extend(ceph.discover_ceph_nodes(cluster_info))
+ else:
+ msg_templ = "Unknown cluster type in 'discover' parameter: {0!r}"
+ raise ValueError(msg_templ.format(cluster))
+
+ return nodes_to_run, clean_data
+
+
+def undiscover(clean_data):
+ if clean_data is not None:
+ fuel.clean_fuel_port_forwarding(clean_data)
diff --git a/wally/discover/fuel.py b/wally/discover/fuel.py
new file mode 100644
index 0000000..6e76188
--- /dev/null
+++ b/wally/discover/fuel.py
@@ -0,0 +1,136 @@
+import os
+import re
+import sys
+import socket
+import logging
+from urlparse import urlparse
+
+import yaml
+from wally.fuel_rest_api import (KeystoneAuth, get_cluster_id,
+ reflect_cluster, FuelInfo)
+from wally.utils import parse_creds
+from wally.ssh_utils import run_over_ssh, connect
+
+from .node import Node
+
+
+logger = logging.getLogger("wally.discover")
+BASE_PF_PORT = 33467
+
+
+def discover_fuel_nodes(fuel_data, var_dir):
+ username, tenant_name, password = parse_creds(fuel_data['creds'])
+ creds = {"username": username,
+ "tenant_name": tenant_name,
+ "password": password}
+
+ conn = KeystoneAuth(fuel_data['url'], creds, headers=None)
+
+ cluster_id = get_cluster_id(conn, fuel_data['openstack_env'])
+ cluster = reflect_cluster(conn, cluster_id)
+ version = FuelInfo(conn).get_version()
+
+ fuel_nodes = list(cluster.get_nodes())
+
+ logger.debug("Found FUEL {0}".format("".join(map(str, version))))
+
+ network = 'fuelweb_admin' if version >= [6, 0] else 'admin'
+
+ ssh_creds = fuel_data['ssh_creds']
+
+ fuel_host = urlparse(fuel_data['url']).hostname
+ fuel_ip = socket.gethostbyname(fuel_host)
+ ssh_conn = connect("{0}@@{1}".format(ssh_creds, fuel_host))
+
+ fuel_ext_iface = get_external_interface(ssh_conn, fuel_ip)
+
+ # TODO: keep ssh key in memory
+ # http://stackoverflow.com/questions/11994139/how-to-include-the-private-key-in-paramiko-after-fetching-from-string
+ fuel_key_file = os.path.join(var_dir, "fuel_master_node_id_rsa")
+ download_master_key(ssh_conn, fuel_key_file)
+
+ nodes = []
+ ports = range(BASE_PF_PORT, BASE_PF_PORT + len(fuel_nodes))
+ ips_ports = []
+
+ for fuel_node, port in zip(fuel_nodes, ports):
+ ip = fuel_node.get_ip(network)
+ forward_ssh_port(ssh_conn, fuel_ext_iface, port, ip)
+
+ conn_url = "ssh://root@{0}:{1}:{2}".format(fuel_host,
+ port,
+ fuel_key_file)
+ nodes.append(Node(conn_url, fuel_node['roles']))
+ ips_ports.append((ip, port))
+
+ logger.debug("Found %s fuel nodes for env %r" %
+ (len(nodes), fuel_data['openstack_env']))
+
+ return ([],
+ (ssh_conn, fuel_ext_iface, ips_ports),
+ cluster.get_openrc())
+
+ return (nodes,
+ (ssh_conn, fuel_ext_iface, ips_ports),
+ cluster.get_openrc())
+
+
+def download_master_key(conn, dest):
+ # download master key
+ sftp = conn.open_sftp()
+ sftp.get('/root/.ssh/id_rsa', dest)
+ os.chmod(dest, 0o400)
+ sftp.close()
+
+ logger.debug("Fuel master key stored in {0}".format(dest))
+
+
+def get_external_interface(conn, ip):
+ data = run_over_ssh(conn, "ip a", node='fuel-master')
+ curr_iface = None
+ for line in data.split("\n"):
+
+ match1 = re.match(r"\d+:\s+(?P<name>.*?):\s\<", line)
+ if match1 is not None:
+ curr_iface = match1.group('name')
+
+ match2 = re.match(r"\s+inet\s+(?P<ip>[0-9.]+)/", line)
+ if match2 is not None:
+ if match2.group('ip') == ip:
+ assert curr_iface is not None
+ return curr_iface
+ raise KeyError("Can't found interface for ip {0}".format(ip))
+
+
+def forward_ssh_port(conn, iface, new_port, ip, clean=False):
+ mode = "-D" if clean is True else "-A"
+ cmd = "iptables -t nat {mode} PREROUTING -p tcp " + \
+ "-i {iface} --dport {port} -j DNAT --to {ip}:22"
+ run_over_ssh(conn,
+ cmd.format(iface=iface, port=new_port, ip=ip, mode=mode),
+ node='fuel-master')
+
+
+def clean_fuel_port_forwarding(clean_data):
+ conn, iface, ips_ports = clean_data
+ for ip, port in ips_ports:
+ forward_ssh_port(conn, iface, port, ip, clean=True)
+
+
+def main(argv):
+ fuel_data = yaml.load(open(sys.argv[1]).read())['clouds']['fuel']
+ nodes, to_clean, openrc = discover_fuel_nodes(fuel_data, '/tmp')
+
+ print nodes
+ print openrc
+ print "Ready to test"
+
+ sys.stdin.readline()
+
+ clean_fuel_port_forwarding(to_clean)
+
+ return 0
+
+
+if __name__ == "__main__":
+ main(sys.argv[1:])
diff --git a/wally/discover/node.py b/wally/discover/node.py
new file mode 100644
index 0000000..fad4f29
--- /dev/null
+++ b/wally/discover/node.py
@@ -0,0 +1,22 @@
+import urlparse
+
+
+class Node(object):
+
+ def __init__(self, conn_url, roles):
+ self.roles = roles
+ self.conn_url = conn_url
+ self.connection = None
+
+ def get_ip(self):
+ return urlparse.urlparse(self.conn_url).hostname
+
+ def __str__(self):
+ templ = "<Node: url={conn_url!r} roles={roles}" + \
+ " connected={is_connected}>"
+ return templ.format(conn_url=self.conn_url,
+ roles=", ".join(self.roles),
+ is_connected=self.connection is not None)
+
+ def __repr__(self):
+ return str(self)
diff --git a/wally/discover/openstack.py b/wally/discover/openstack.py
new file mode 100644
index 0000000..32b4629
--- /dev/null
+++ b/wally/discover/openstack.py
@@ -0,0 +1,70 @@
+import socket
+import logging
+
+
+from novaclient.client import Client
+
+from .node import Node
+from wally.utils import parse_creds
+
+
+logger = logging.getLogger("io-perf-tool.discover")
+
+
+def get_floating_ip(vm):
+ addrs = vm.addresses
+ for net_name, ifaces in addrs.items():
+ for iface in ifaces:
+ if iface.get('OS-EXT-IPS:type') == "floating":
+ return iface['addr']
+
+
+def discover_vms(client, search_opts):
+ user, password, key = parse_creds(search_opts.pop('auth'))
+
+ servers = client.servers.list(search_opts=search_opts)
+ logger.debug("Found %s openstack vms" % len(servers))
+ return [Node(get_floating_ip(server), ["test_vm"], username=user,
+ password=password, key_path=key)
+ for server in servers if get_floating_ip(server)]
+
+
+def discover_services(client, opts):
+ user, password, key = parse_creds(opts.pop('auth'))
+
+ services = []
+ if opts['service'] == "all":
+ services = client.services.list()
+ else:
+ if isinstance(opts['service'], basestring):
+ opts['service'] = [opts['service']]
+
+ for s in opts['service']:
+ services.extend(client.services.list(binary=s))
+
+ host_services_mapping = {}
+
+ for service in services:
+ ip = socket.gethostbyname(service.host)
+ host_services_mapping[ip].append(service.binary)
+
+ logger.debug("Found %s openstack service nodes" %
+ len(host_services_mapping))
+ return [Node(host, services, username=user,
+ password=password, key_path=key) for host, services in
+ host_services_mapping.items()]
+
+
+def discover_openstack_nodes(conn_details, conf):
+ """Discover vms running in openstack
+ :param conn_details - dict with openstack connection details -
+ auth_url, api_key (password), username
+ """
+ client = Client(version='1.1', **conn_details)
+ nodes = []
+ if conf.get('discover'):
+ services_to_discover = conf['discover'].get('nodes')
+ if services_to_discover:
+ nodes.extend(discover_services(client, services_to_discover))
+
+ return nodes
diff --git a/wally/fuel_rest_api.py b/wally/fuel_rest_api.py
new file mode 100644
index 0000000..499510d
--- /dev/null
+++ b/wally/fuel_rest_api.py
@@ -0,0 +1,540 @@
+import re
+import json
+import time
+import logging
+import urllib2
+import urlparse
+from functools import partial, wraps
+
+import netaddr
+
+from keystoneclient.v2_0 import Client as keystoneclient
+from keystoneclient import exceptions
+
+
+logger = logging.getLogger("wally.fuel_api")
+
+
+class Urllib2HTTP(object):
+ """
+ class for making HTTP requests
+ """
+
+ allowed_methods = ('get', 'put', 'post', 'delete', 'patch', 'head')
+
+ def __init__(self, root_url, headers=None):
+ """
+ """
+ if root_url.endswith('/'):
+ self.root_url = root_url[:-1]
+ else:
+ self.root_url = root_url
+
+ self.headers = headers if headers is not None else {}
+
+ def host(self):
+ return self.root_url.split('/')[2]
+
+ def do(self, method, path, params=None):
+ if path.startswith('/'):
+ url = self.root_url + path
+ else:
+ url = self.root_url + '/' + path
+
+ if method == 'get':
+ assert params == {} or params is None
+ data_json = None
+ else:
+ data_json = json.dumps(params)
+
+ logger.debug("HTTP: {} {}".format(method.upper(), url))
+
+ request = urllib2.Request(url,
+ data=data_json,
+ headers=self.headers)
+ if data_json is not None:
+ request.add_header('Content-Type', 'application/json')
+
+ request.get_method = lambda: method.upper()
+ response = urllib2.urlopen(request)
+
+ logger.debug("HTTP Responce: {}".format(response.code))
+
+ if response.code < 200 or response.code > 209:
+ raise IndexError(url)
+
+ content = response.read()
+
+ if '' == content:
+ return None
+
+ return json.loads(content)
+
+ def __getattr__(self, name):
+ if name in self.allowed_methods:
+ return partial(self.do, name)
+ raise AttributeError(name)
+
+
+class KeystoneAuth(Urllib2HTTP):
+ def __init__(self, root_url, creds, headers=None):
+ super(KeystoneAuth, self).__init__(root_url, headers)
+ admin_node_ip = urlparse.urlparse(root_url).hostname
+ self.keystone_url = "http://{0}:5000/v2.0".format(admin_node_ip)
+ self.keystone = keystoneclient(
+ auth_url=self.keystone_url, **creds)
+ self.refresh_token()
+
+ def refresh_token(self):
+ """Get new token from keystone and update headers"""
+ try:
+ self.keystone.authenticate()
+ self.headers['X-Auth-Token'] = self.keystone.auth_token
+ except exceptions.AuthorizationFailure:
+ logger.warning(
+ 'Cant establish connection to keystone with url %s',
+ self.keystone_url)
+
+ def do(self, method, path, params=None):
+ """Do request. If gets 401 refresh token"""
+ try:
+ return super(KeystoneAuth, self).do(method, path, params)
+ except urllib2.HTTPError as e:
+ if e.code == 401:
+ logger.warning(
+ 'Authorization failure: {0}'.format(e.read()))
+ self.refresh_token()
+ return super(KeystoneAuth, self).do(method, path, params)
+ else:
+ raise
+
+
+def get_inline_param_list(url):
+ format_param_rr = re.compile(r"\{([a-zA-Z_]+)\}")
+ for match in format_param_rr.finditer(url):
+ yield match.group(1)
+
+
+class RestObj(object):
+ name = None
+ id = None
+
+ def __init__(self, conn, **kwargs):
+ self.__dict__.update(kwargs)
+ self.__connection__ = conn
+
+ def __str__(self):
+ res = ["{}({}):".format(self.__class__.__name__, self.name)]
+ for k, v in sorted(self.__dict__.items()):
+ if k.startswith('__') or k.endswith('__'):
+ continue
+ if k != 'name':
+ res.append(" {}={!r}".format(k, v))
+ return "\n".join(res)
+
+ def __getitem__(self, item):
+ return getattr(self, item)
+
+
+def make_call(method, url):
+ def closure(obj, entire_obj=None, **data):
+ inline_params_vals = {}
+ for name in get_inline_param_list(url):
+ if name in data:
+ inline_params_vals[name] = data[name]
+ del data[name]
+ else:
+ inline_params_vals[name] = getattr(obj, name)
+ result_url = url.format(**inline_params_vals)
+
+ if entire_obj is not None:
+ if data != {}:
+ raise ValueError("Both entire_obj and data provided")
+ data = entire_obj
+ return obj.__connection__.do(method, result_url, params=data)
+ return closure
+
+
+PUT = partial(make_call, 'put')
+GET = partial(make_call, 'get')
+DELETE = partial(make_call, 'delete')
+
+
+def with_timeout(tout, message):
+ def closure(func):
+ @wraps(func)
+ def closure2(*dt, **mp):
+ ctime = time.time()
+ etime = ctime + tout
+
+ while ctime < etime:
+ if func(*dt, **mp):
+ return
+ sleep_time = ctime + 1 - time.time()
+ if sleep_time > 0:
+ time.sleep(sleep_time)
+ ctime = time.time()
+ raise RuntimeError("Timeout during " + message)
+ return closure2
+ return closure
+
+
+# ------------------------------- ORM ----------------------------------------
+
+
+def get_fuel_info(url):
+ conn = Urllib2HTTP(url)
+ return FuelInfo(conn)
+
+
+class FuelInfo(RestObj):
+
+ """Class represents Fuel installation info"""
+
+ get_nodes = GET('api/nodes')
+ get_clusters = GET('api/clusters')
+ get_cluster = GET('api/clusters/{id}')
+ get_info = GET('api/releases')
+
+ @property
+ def nodes(self):
+ """Get all fuel nodes"""
+ return NodeList([Node(self.__connection__, **node) for node
+ in self.get_nodes()])
+
+ @property
+ def free_nodes(self):
+ """Get unallocated nodes"""
+ return NodeList([Node(self.__connection__, **node) for node in
+ self.get_nodes() if not node['cluster']])
+
+ @property
+ def clusters(self):
+ """List clusters in fuel"""
+ return [Cluster(self.__connection__, **cluster) for cluster
+ in self.get_clusters()]
+
+ def get_version(self):
+ for info in self.get_info():
+ vers = info['version'].split("-")[1].split('.')
+ return map(int, vers)
+ raise ValueError("No version found")
+
+
+class Node(RestObj):
+ """Represents node in Fuel"""
+
+ get_info = GET('/api/nodes/{id}')
+ get_interfaces = GET('/api/nodes/{id}/interfaces')
+ update_interfaces = PUT('/api/nodes/{id}/interfaces')
+
+ def set_network_assigment(self, mapping):
+ """Assings networks to interfaces
+ :param mapping: list (dict) interfaces info
+ """
+
+ curr_interfaces = self.get_interfaces()
+
+ network_ids = {}
+ for interface in curr_interfaces:
+ for net in interface['assigned_networks']:
+ network_ids[net['name']] = net['id']
+
+ # transform mappings
+ new_assigned_networks = {}
+
+ for dev_name, networks in mapping.items():
+ new_assigned_networks[dev_name] = []
+ for net_name in networks:
+ nnet = {'name': net_name, 'id': network_ids[net_name]}
+ new_assigned_networks[dev_name].append(nnet)
+
+ # update by ref
+ for dev_descr in curr_interfaces:
+ if dev_descr['name'] in new_assigned_networks:
+ nass = new_assigned_networks[dev_descr['name']]
+ dev_descr['assigned_networks'] = nass
+
+ self.update_interfaces(curr_interfaces, id=self.id)
+
+ def set_node_name(self, name):
+ """Update node name"""
+ self.__connection__.put('api/nodes', [{'id': self.id, 'name': name}])
+
+ def get_network_data(self):
+ """Returns node network data"""
+ node_info = self.get_info()
+ return node_info.get('network_data')
+
+ def get_roles(self, pending=False):
+ """Get node roles
+
+ Returns: (roles, pending_roles)
+ """
+ node_info = self.get_info()
+ if pending:
+ return node_info.get('roles'), node_info.get('pending_roles')
+ else:
+ return node_info.get('roles')
+
+ def get_ip(self, network='public'):
+ """Get node ip
+
+ :param network: network to pick
+ """
+ nets = self.get_network_data()
+ for net in nets:
+ if net['name'] == network:
+ iface_name = net['dev']
+ for iface in self.get_info()['meta']['interfaces']:
+ if iface['name'] == iface_name:
+ try:
+ return iface['ip']
+ except KeyError:
+ return netaddr.IPNetwork(net['ip']).ip
+ raise Exception('Network %s not found' % network)
+
+
+class NodeList(list):
+ """Class for filtering nodes through attributes"""
+ allowed_roles = ['controller', 'compute', 'cinder', 'ceph-osd', 'mongo',
+ 'zabbix-server']
+
+ def __getattr__(self, name):
+ if name in self.allowed_roles:
+ return [node for node in self if name in node.roles]
+
+
+class Cluster(RestObj):
+ """Class represents Cluster in Fuel"""
+
+ add_node_call = PUT('api/nodes')
+ start_deploy = PUT('api/clusters/{id}/changes')
+ get_status = GET('api/clusters/{id}')
+ delete = DELETE('api/clusters/{id}')
+ get_tasks_status = GET("api/tasks?cluster_id={id}")
+ get_networks = GET(
+ 'api/clusters/{id}/network_configuration/{net_provider}')
+
+ get_attributes = GET(
+ 'api/clusters/{id}/attributes')
+
+ set_attributes = PUT(
+ 'api/clusters/{id}/attributes')
+
+ configure_networks = PUT(
+ 'api/clusters/{id}/network_configuration/{net_provider}')
+
+ _get_nodes = GET('api/nodes?cluster_id={id}')
+
+ def __init__(self, *dt, **mp):
+ super(Cluster, self).__init__(*dt, **mp)
+ self.nodes = NodeList([Node(self.__connection__, **node) for node in
+ self._get_nodes()])
+ self.network_roles = {}
+
+ def check_exists(self):
+ """Check if cluster exists"""
+ try:
+ self.get_status()
+ return True
+ except urllib2.HTTPError as err:
+ if err.code == 404:
+ return False
+ raise
+
+ def get_openrc(self):
+ access = self.get_attributes()['editable']['access']
+ creds = {}
+ creds['username'] = access['user']['value']
+ creds['password'] = access['password']['value']
+ creds['tenant_name'] = access['tenant']['value']
+ if self.nodes.controller:
+ contr = self.nodes.controller[0]
+ creds['os_auth_url'] = "http://%s:5000/v2.0" \
+ % contr.get_ip(network="public")
+ else:
+ creds['os_auth_url'] = ""
+ return creds
+
+ def get_nodes(self):
+ for node_descr in self._get_nodes():
+ yield Node(self.__connection__, **node_descr)
+
+ def add_node(self, node, roles, interfaces=None):
+ """Add node to cluster
+
+ :param node: Node object
+ :param roles: roles to assign
+ :param interfaces: mapping iface name to networks
+ """
+ data = {}
+ data['pending_roles'] = roles
+ data['cluster_id'] = self.id
+ data['id'] = node.id
+ data['pending_addition'] = True
+
+ logger.debug("Adding node %s to cluster..." % node.id)
+
+ self.add_node_call([data])
+ self.nodes.append(node)
+
+ if interfaces is not None:
+ networks = {}
+ for iface_name, params in interfaces.items():
+ networks[iface_name] = params['networks']
+
+ node.set_network_assigment(networks)
+
+ def wait_operational(self, timeout):
+ """Wait until cluster status operational"""
+ def wo():
+ status = self.get_status()['status']
+ if status == "error":
+ raise Exception("Cluster deploy failed")
+ return self.get_status()['status'] == 'operational'
+ with_timeout(timeout, "deploy cluster")(wo)()
+
+ def deploy(self, timeout):
+ """Start deploy and wait until all tasks finished"""
+ logger.debug("Starting deploy...")
+ self.start_deploy()
+
+ self.wait_operational(timeout)
+
+ def all_tasks_finished_ok(obj):
+ ok = True
+ for task in obj.get_tasks_status():
+ if task['status'] == 'error':
+ raise Exception('Task execution error')
+ elif task['status'] != 'ready':
+ ok = False
+ return ok
+
+ wto = with_timeout(timeout, "wait deployment finished")
+ wto(all_tasks_finished_ok)(self)
+
+ def set_networks(self, net_descriptions):
+ """Update cluster networking parameters"""
+ configuration = self.get_networks()
+ current_networks = configuration['networks']
+ parameters = configuration['networking_parameters']
+
+ if net_descriptions.get('networks'):
+ net_mapping = net_descriptions['networks']
+
+ for net in current_networks:
+ net_desc = net_mapping.get(net['name'])
+ if net_desc:
+ net.update(net_desc)
+
+ if net_descriptions.get('networking_parameters'):
+ parameters.update(net_descriptions['networking_parameters'])
+
+ self.configure_networks(**configuration)
+
+
+def reflect_cluster(conn, cluster_id):
+ """Returns cluster object by id"""
+ c = Cluster(conn, id=cluster_id)
+ c.nodes = NodeList(list(c.get_nodes()))
+ return c
+
+
+def get_all_nodes(conn):
+ """Get all nodes from Fuel"""
+ for node_desc in conn.get('api/nodes'):
+ yield Node(conn, **node_desc)
+
+
+def get_all_clusters(conn):
+ """Get all clusters"""
+ for cluster_desc in conn.get('api/clusters'):
+ yield Cluster(conn, **cluster_desc)
+
+
+def get_cluster_id(conn, name):
+ """Get cluster id by name"""
+ for cluster in get_all_clusters(conn):
+ if cluster.name == name:
+ return cluster.id
+
+ raise ValueError("Cluster {0} not found".format(name))
+
+
+sections = {
+ 'sahara': 'additional_components',
+ 'murano': 'additional_components',
+ 'ceilometer': 'additional_components',
+ 'volumes_ceph': 'storage',
+ 'images_ceph': 'storage',
+ 'ephemeral_ceph': 'storage',
+ 'objects_ceph': 'storage',
+ 'osd_pool_size': 'storage',
+ 'volumes_lvm': 'storage',
+ 'volumes_vmdk': 'storage',
+ 'tenant': 'access',
+ 'password': 'access',
+ 'user': 'access',
+ 'vc_password': 'vcenter',
+ 'cluster': 'vcenter',
+ 'host_ip': 'vcenter',
+ 'vc_user': 'vcenter',
+ 'use_vcenter': 'vcenter',
+}
+
+
+def create_empty_cluster(conn, cluster_desc, debug_mode=False):
+ """Create new cluster with configuration provided"""
+
+ data = {}
+ data['nodes'] = []
+ data['tasks'] = []
+ data['name'] = cluster_desc['name']
+ data['release'] = cluster_desc['release']
+ data['mode'] = cluster_desc.get('deployment_mode')
+ data['net_provider'] = cluster_desc.get('net_provider')
+
+ params = conn.post(path='/api/clusters', params=data)
+ cluster = Cluster(conn, **params)
+
+ attributes = cluster.get_attributes()
+
+ ed_attrs = attributes['editable']
+
+ ed_attrs['common']['libvirt_type']['value'] = \
+ cluster_desc.get('libvirt_type', 'kvm')
+
+ if 'nodes' in cluster_desc:
+ use_ceph = cluster_desc['nodes'].get('ceph_osd', None) is not None
+ else:
+ use_ceph = False
+
+ if 'storage_type' in cluster_desc:
+ st = cluster_desc['storage_type']
+ if st == 'ceph':
+ use_ceph = True
+ else:
+ use_ceph = False
+
+ if use_ceph:
+ opts = ['ephemeral_ceph', 'images_ceph', 'images_vcenter']
+ opts += ['iser', 'objects_ceph', 'volumes_ceph']
+ opts += ['volumes_lvm', 'volumes_vmdk']
+
+ for name in opts:
+ val = ed_attrs['storage'][name]
+ if val['type'] == 'checkbox':
+ is_ceph = ('images_ceph' == name)
+ is_ceph = is_ceph or ('volumes_ceph' == name)
+
+ if is_ceph:
+ val['value'] = True
+ else:
+ val['value'] = False
+ # else:
+ # raise NotImplementedError("Non-ceph storages are not implemented")
+
+ cluster.set_attributes(attributes)
+
+ return cluster
diff --git a/wally/keystone.py b/wally/keystone.py
new file mode 100644
index 0000000..24d322c
--- /dev/null
+++ b/wally/keystone.py
@@ -0,0 +1,90 @@
+import json
+import urllib2
+from functools import partial
+
+from keystoneclient import exceptions
+from keystoneclient.v2_0 import Client as keystoneclient
+
+
+class Urllib2HTTP(object):
+ """
+ class for making HTTP requests
+ """
+
+ allowed_methods = ('get', 'put', 'post', 'delete', 'patch', 'head')
+
+ def __init__(self, root_url, headers=None, echo=False):
+ """
+ """
+ if root_url.endswith('/'):
+ self.root_url = root_url[:-1]
+ else:
+ self.root_url = root_url
+
+ self.headers = headers if headers is not None else {}
+ self.echo = echo
+
+ def do(self, method, path, params=None):
+ if path.startswith('/'):
+ url = self.root_url + path
+ else:
+ url = self.root_url + '/' + path
+
+ if method == 'get':
+ assert params == {} or params is None
+ data_json = None
+ else:
+ data_json = json.dumps(params)
+
+ request = urllib2.Request(url,
+ data=data_json,
+ headers=self.headers)
+ if data_json is not None:
+ request.add_header('Content-Type', 'application/json')
+
+ request.get_method = lambda: method.upper()
+ response = urllib2.urlopen(request)
+
+ if response.code < 200 or response.code > 209:
+ raise IndexError(url)
+
+ content = response.read()
+
+ if '' == content:
+ return None
+
+ return json.loads(content)
+
+ def __getattr__(self, name):
+ if name in self.allowed_methods:
+ return partial(self.do, name)
+ raise AttributeError(name)
+
+
+class KeystoneAuth(Urllib2HTTP):
+ def __init__(self, root_url, creds, headers=None, echo=False,
+ admin_node_ip=None):
+ super(KeystoneAuth, self).__init__(root_url, headers, echo)
+ self.keystone_url = "http://{0}:5000/v2.0".format(admin_node_ip)
+ self.keystone = keystoneclient(
+ auth_url=self.keystone_url, **creds)
+ self.refresh_token()
+
+ def refresh_token(self):
+ """Get new token from keystone and update headers"""
+ try:
+ self.keystone.authenticate()
+ self.headers['X-Auth-Token'] = self.keystone.auth_token
+ except exceptions.AuthorizationFailure:
+ raise
+
+ def do(self, method, path, params=None):
+ """Do request. If gets 401 refresh token"""
+ try:
+ return super(KeystoneAuth, self).do(method, path, params)
+ except urllib2.HTTPError as e:
+ if e.code == 401:
+ self.refresh_token()
+ return super(KeystoneAuth, self).do(method, path, params)
+ else:
+ raise
diff --git a/wally/meta_info.py b/wally/meta_info.py
new file mode 100644
index 0000000..127612b
--- /dev/null
+++ b/wally/meta_info.py
@@ -0,0 +1,68 @@
+from urlparse import urlparse
+from keystone import KeystoneAuth
+
+
+def total_lab_info(data):
+ lab_data = {}
+ lab_data['nodes_count'] = len(data['nodes'])
+ lab_data['total_memory'] = 0
+ lab_data['total_disk'] = 0
+ lab_data['processor_count'] = 0
+
+ for node in data['nodes']:
+ lab_data['total_memory'] += node['memory']['total']
+ lab_data['processor_count'] += len(node['processors'])
+
+ for disk in node['disks']:
+ lab_data['total_disk'] += disk['size']
+
+ def to_gb(x):
+ return x / (1024 ** 3)
+
+ lab_data['total_memory'] = format(to_gb(lab_data['total_memory']), ',d')
+ lab_data['total_disk'] = format(to_gb(lab_data['total_disk']), ',d')
+ return lab_data
+
+
+def collect_lab_data(url, cred):
+ u = urlparse(url)
+ keystone = KeystoneAuth(root_url=url, creds=cred, admin_node_ip=u.hostname)
+ lab_info = keystone.do(method='get', path="/api/nodes")
+ fuel_version = keystone.do(method='get', path="/api/version/")
+
+ nodes = []
+ result = {}
+
+ for node in lab_info:
+ # <koder>: give p,i,d,... vars meaningful names
+ d = {}
+ d['name'] = node['name']
+ p = []
+ i = []
+ disks = []
+ devices = []
+
+ for processor in node['meta']['cpu']['spec']:
+ p.append(processor)
+
+ for iface in node['meta']['interfaces']:
+ i.append(iface)
+
+ m = node['meta']['memory'].copy()
+
+ for disk in node['meta']['disks']:
+ disks.append(disk)
+
+ d['memory'] = m
+ d['disks'] = disks
+ d['devices'] = devices
+ d['interfaces'] = i
+ d['processors'] = p
+
+ nodes.append(d)
+
+ result['nodes'] = nodes
+ # result['name'] = 'Perf-1 Env'
+ result['fuel_version'] = fuel_version['release']
+
+ return result
diff --git a/wally/pretty_yaml.py b/wally/pretty_yaml.py
new file mode 100644
index 0000000..4fc4a49
--- /dev/null
+++ b/wally/pretty_yaml.py
@@ -0,0 +1,79 @@
+__doc__ = "functions for make pretty yaml files"
+__all__ = ['dumps']
+
+
+def dumps_simple(val):
+ bad_symbols = set(" \r\t\n,':")
+
+ if isinstance(val, basestring):
+ if len(bad_symbols & set(val)) != 0:
+ return repr(val)
+ return val
+ elif val is True:
+ return 'true'
+ elif val is False:
+ return 'false'
+ elif val is None:
+ return 'null'
+
+ return str(val)
+
+
+def is_simple(val):
+ simple_type = isinstance(val, (str, unicode, int, long, bool, float))
+ return simple_type or val is None
+
+
+def all_nums(vals):
+ return all(isinstance(val, (int, float, long)) for val in vals)
+
+
+def dumpv(data, tab_sz=4, width=120, min_width=40):
+ tab = ' ' * tab_sz
+
+ if width < min_width:
+ width = min_width
+
+ res = []
+ if is_simple(data):
+ return [dumps_simple(data)]
+
+ if isinstance(data, (list, tuple)):
+ if all(map(is_simple, data)):
+ if all_nums(data):
+ one_line = "[{}]".format(", ".join(map(dumps_simple, data)))
+ else:
+ one_line = "[{}]".format(",".join(map(dumps_simple, data)))
+ else:
+ one_line = None
+
+ if one_line is None or len(one_line) > width:
+ pref = "-" + ' ' * (tab_sz - 1)
+
+ for val in data:
+ items = dumpv(val, tab_sz, width - tab_sz, min_width)
+ items = [pref + items[0]] + \
+ [tab + item for item in items[1:]]
+ res.extend(items)
+ else:
+ res.append(one_line)
+ elif isinstance(data, dict):
+ assert all(map(is_simple, data.keys()))
+
+ for k, v in data.items():
+ key_str = dumps_simple(k) + ": "
+ val_res = dumpv(v, tab_sz, width - tab_sz, min_width)
+
+ if len(val_res) == 1 and len(key_str + val_res[0]) < width:
+ res.append(key_str + val_res[0])
+ else:
+ res.append(key_str)
+ res.extend(tab + i for i in val_res)
+ else:
+ raise ValueError("Can't pack {0!r}".format(data))
+
+ return res
+
+
+def dumps(data, tab_sz=4, width=120, min_width=40):
+ return "\n".join(dumpv(data, tab_sz, width, min_width))
diff --git a/wally/report.py b/wally/report.py
new file mode 100644
index 0000000..d2f2d96
--- /dev/null
+++ b/wally/report.py
@@ -0,0 +1,236 @@
+import os
+import sys
+
+from wally import charts
+from wally.statistic import med_dev
+from wally.utils import parse_creds
+from wally.suits.io.results_loader import filter_data
+from wally.meta_info import total_lab_info, collect_lab_data
+
+
+# from collections import OrderedDict
+# from wally.suits.io import formatter
+# def pgbench_chart_data(results):
+# """
+# Format pgbench results for chart
+# """
+# data = {}
+# charts_url = []
+
+# formatted_res = formatters.format_pgbench_stat(results)
+# for key, value in formatted_res.items():
+# num_cl, num_tr = key.split(' ')
+# data.setdefault(num_cl, {}).setdefault(build, {})
+# data[keys[z]][build][
+# ' '.join(keys)] = value
+
+# for name, value in data.items():
+# title = name
+# legend = []
+# dataset = []
+
+# scale_x = []
+
+# for build_id, build_results in value.items():
+# vals = []
+# OD = OrderedDict
+# ordered_build_results = OD(sorted(build_results.items(),
+# key=lambda t: t[0]))
+# scale_x = ordered_build_results.keys()
+# for key in scale_x:
+# res = build_results.get(key)
+# if res:
+# vals.append(res)
+# if vals:
+# dataset.append(vals)
+# legend.append(build_id)
+
+# if dataset:
+# charts_url.append(str(charts.render_vertical_bar
+# (title, legend, dataset, scale_x=scale_x)))
+# return charts_url
+
+# def build_lines_chart(results, z=0):
+# data = {}
+# charts_url = []
+
+# for build, res in results:
+# formatted_res = formatters.get_formatter(build)(res)
+# for key, value in formatted_res.items():
+# keys = key.split(' ')
+# data.setdefault(key[z], {})
+# data[key[z]].setdefault(build, {})[keys[1]] = value
+
+# for name, value in data.items():
+# title = name
+# legend = []
+# dataset = []
+# scale_x = []
+# for build_id, build_results in value.items():
+# legend.append(build_id)
+
+# OD = OrderedDict
+# ordered_build_results = OD(sorted(build_results.items(),
+# key=lambda t: ssize_to_b(t[0])))
+
+# if not scale_x:
+# scale_x = ordered_build_results.keys()
+# dataset.append(zip(*ordered_build_results.values())[0])
+
+# chart = charts.render_lines(title, legend, dataset, scale_x)
+# charts_url.append(str(chart))
+
+# return charts_url
+
+# def build_vertical_bar(results, z=0):
+# data = {}
+# charts_url = []
+# for build, res in results:
+# formatted_res = formatter.get_formatter(build)(res)
+# for key, value in formatted_res.items():
+# keys = key.split(' ')
+# data.setdefault(keys[z], {}).setdefault(build, {})
+# data[keys[z]][build][
+# ' '.join(keys)] = value
+
+# for name, value in data.items():
+# title = name
+# legend = []
+# dataset = []
+
+# scale_x = []
+
+# for build_id, build_results in value.items():
+# vals = []
+# OD = OrderedDict
+# ordered_build_results = OD(sorted(build_results.items(),
+# key=lambda t: t[0]))
+# scale_x = ordered_build_results.keys()
+# for key in scale_x:
+# res = build_results.get(key)
+# if res:
+# vals.append(res)
+# if vals:
+# dataset.append(vals)
+# legend.append(build_id)
+
+# if dataset:
+# charts_url.append(str(charts.render_vertical_bar
+# (title, legend, dataset, scale_x=scale_x)))
+# return charts_url
+
+
+def render_html(charts_urls, dest, lab_description, info):
+ templ = open("report.html", 'r').read()
+ body = "<a href='#lab_desc'>Lab description</a>" \
+ "<ol>{0}</ol>" \
+ "<div>{1}</div>" \
+ '<a name="lab_desc"></a>' \
+ "<div><ul>{2}</ul></div>"
+ table = "<table><tr><td>{0}</td><td>{1}</td></tr>" \
+ "<tr><td>{2}</td><td>{3}</td></tr></table>"
+ ul = []
+ ol = []
+ li = '<li>{0} : {1}</li>'
+
+ for elem in info:
+ ol.append(li.format(elem.keys(), elem.values()))
+
+ for key in lab_description:
+ value = lab_description[key]
+ ul.append("<li>{0} : {1}</li>".format(key, value))
+
+ charts_urls = ['<img src="{0}">'.format(url) for url in charts_urls]
+
+ body = body.format('\n'.join(ol),
+ table.format(*charts_urls),
+ '\n'.join(ul))
+
+ open(dest, 'w').write(templ % {'body': body})
+
+
+def io_chart(title, concurence, latv, iops_or_bw, iops_or_bw_dev,
+ legend, fname):
+ bar_data, bar_dev = iops_or_bw, iops_or_bw_dev
+ legend = [legend]
+
+ iops_or_bw_per_vm = []
+ for i in range(len(concurence)):
+ iops_or_bw_per_vm.append(iops_or_bw[i] / concurence[i])
+
+ bar_dev_bottom = []
+ bar_dev_top = []
+ for i in range(len(bar_data)):
+ bar_dev_top.append(bar_data[i] + bar_dev[i])
+ bar_dev_bottom.append(bar_data[i] - bar_dev[i])
+
+ latv = [lat / 1000 for lat in latv]
+ ch = charts.render_vertical_bar(title, legend, [bar_data], [bar_dev_top],
+ [bar_dev_bottom], file_name=fname,
+ scale_x=concurence,
+ lines=[
+ (latv, "msec", "rr", "lat"),
+ (iops_or_bw_per_vm, None, None,
+ "bw_per_vm")
+ ])
+ return str(ch)
+
+
+def make_io_report(results, path, lab_url=None, creds=None):
+ if lab_url is not None:
+ username, password, tenant_name = parse_creds(creds)
+ creds = {'username': username,
+ 'password': password,
+ "tenant_name": tenant_name}
+ data = collect_lab_data(lab_url, creds)
+ lab_info = total_lab_info(data)
+ else:
+ lab_info = ""
+
+ for suite_type, test_suite_data in results:
+ if suite_type != 'io' or test_suite_data is None:
+ continue
+
+ io_test_suite_res = test_suite_data['res']
+
+ charts_url = []
+ info = []
+
+ name_filters = [
+ ('hdd_test_rrd4k', ('concurence', 'lat', 'iops'), 'rand_read_4k'),
+ ('hdd_test_swd1m', ('concurence', 'lat', 'bw'), 'seq_write_1m'),
+ ('hdd_test_srd1m', ('concurence', 'lat', 'bw'), 'seq_read_1m'),
+ ('hdd_test_rws4k', ('concurence', 'lat', 'bw'), 'rand_write_1m')
+ ]
+
+ for name_filter, fields, fname in name_filters:
+ th_filter = filter_data(name_filter, fields)
+
+ data = sorted(th_filter(io_test_suite_res.values()))
+ if len(data) == 0:
+ continue
+
+ concurence, latv, iops_or_bw_v = zip(*data)
+ iops_or_bw_v, iops_or_bw_dev_v = zip(*map(med_dev, iops_or_bw_v))
+ latv, _ = zip(*map(med_dev, latv))
+
+ url = io_chart(name_filter, concurence, latv, iops_or_bw_v,
+ iops_or_bw_dev_v,
+ fields[2], fname)
+ info.append(dict(zip(fields, (concurence, latv, iops_or_bw_v))))
+ charts_url.append(url)
+
+ if len(charts_url) != 0:
+ render_html(charts_url, path, lab_info, info)
+
+
+def main(args):
+ make_io_report(results=[('a', 'b')],
+ path=os.path.dirname(args[0]),
+ lab_url='http://172.16.52.112:8000',
+ creds='admin:admin@admin')
+ return 0
+
+
+if __name__ == '__main__':
+ exit(main(sys.argv))
diff --git a/wally/rest_api.py b/wally/rest_api.py
new file mode 100644
index 0000000..fc9bd05
--- /dev/null
+++ b/wally/rest_api.py
@@ -0,0 +1,25 @@
+import json
+import requests
+
+
+def add_test(test_name, test_data, url):
+ if not url.endswith("/"):
+ url += '/api/tests/' + test_name
+ requests.post(url=url, data=json.dumps(test_data))
+
+
+def get_test(test_name, url):
+ if not url.endswith("/"):
+ url += '/api/tests/' + test_name
+
+ result = requests.get(url=url)
+
+ return json.loads(result.content)
+
+
+def get_all_tests(url):
+ if not url.endswith('/'):
+ url += '/api/tests'
+
+ result = requests.get(url=url)
+ return json.loads(result.content)
diff --git a/wally/run_test.py b/wally/run_test.py
new file mode 100755
index 0000000..7899bae
--- /dev/null
+++ b/wally/run_test.py
@@ -0,0 +1,589 @@
+from __future__ import print_function
+
+import os
+import sys
+import time
+import Queue
+import pprint
+import logging
+import argparse
+import threading
+import collections
+
+import yaml
+from concurrent.futures import ThreadPoolExecutor
+
+from wally import pretty_yaml
+from wally.discover import discover, Node, undiscover
+from wally import utils, report, ssh_utils, start_vms
+from wally.suits.itest import IOPerfTest, PgBenchTest
+from wally.config import cfg_dict, load_config, setup_loggers
+from wally.sensors.api import (start_monitoring,
+ deploy_and_start_sensors,
+ SensorConfig)
+
+
+logger = logging.getLogger("wally")
+
+
+def format_result(res, formatter):
+ data = "\n{0}\n".format("=" * 80)
+ data += pprint.pformat(res) + "\n"
+ data += "{0}\n".format("=" * 80)
+ templ = "{0}\n\n====> {1}\n\n{2}\n\n"
+ return templ.format(data, formatter(res), "=" * 80)
+
+
+class Context(object):
+ def __init__(self):
+ self.build_meta = {}
+ self.nodes = []
+ self.clear_calls_stack = []
+ self.openstack_nodes_ids = []
+ self.sensor_cm = None
+ self.keep_vm = False
+ self.sensors_control_queue = None
+ self.sensor_listen_thread = None
+
+
+def connect_one(node):
+ try:
+ ssh_pref = "ssh://"
+ if node.conn_url.startswith(ssh_pref):
+ url = node.conn_url[len(ssh_pref):]
+ logger.debug("Try connect to " + url)
+ node.connection = ssh_utils.connect(url)
+ else:
+ raise ValueError("Unknown url type {0}".format(node.conn_url))
+ except Exception:
+ logger.exception("During connect to {0}".format(node))
+ raise
+
+
+def connect_all(nodes):
+ logger.info("Connecting to nodes")
+ with ThreadPoolExecutor(32) as pool:
+ list(pool.map(connect_one, nodes))
+ logger.info("All nodes connected successfully")
+
+
+def save_sensors_data(q, mon_q, fd):
+ logger.info("Start receiving sensors data")
+ fd.write("\n")
+
+ observed_nodes = set()
+
+ try:
+ while True:
+ val = q.get()
+ if val is None:
+ break
+
+ addr, data = val
+ if addr not in observed_nodes:
+ mon_q.put(addr)
+ observed_nodes.add(addr)
+
+ fd.write("{0!s} : {1!r}\n".format(time.time(), repr(val)))
+ except Exception:
+ logger.exception("Error in sensors thread")
+ logger.info("Sensors thread exits")
+
+
+def test_thread(test, node, barrier, res_q):
+ try:
+ logger.debug("Run preparation for {0}".format(node.conn_url))
+ test.pre_run(node.connection)
+ logger.debug("Run test for {0}".format(node.conn_url))
+ test.run(node.connection, barrier)
+ except Exception as exc:
+ logger.exception("In test {0} for node {1}".format(test, node))
+ res_q.put(exc)
+
+ try:
+ test.cleanup(node.connection)
+ except:
+ msg = "Duringf cleanup - in test {0} for node {1}"
+ logger.exception(msg.format(test, node))
+
+
+def run_tests(test_block, nodes):
+ tool_type_mapper = {
+ "io": IOPerfTest,
+ "pgbench": PgBenchTest,
+ }
+
+ test_nodes = [node for node in nodes
+ if 'testnode' in node.roles]
+ test_number_per_type = {}
+ res_q = Queue.Queue()
+
+ for name, params in test_block.items():
+ logger.info("Starting {0} tests".format(name))
+ test_num = test_number_per_type.get(name, 0)
+ test_number_per_type[name] = test_num + 1
+ threads = []
+ barrier = utils.Barrier(len(test_nodes))
+
+ for node in test_nodes:
+ msg = "Starting {0} test on {1} node"
+ logger.debug(msg.format(name, node.conn_url))
+
+ dr = os.path.join(
+ cfg_dict['test_log_directory'],
+ "{0}_{1}_{2}".format(name, test_num, node.get_ip())
+ )
+
+ if not os.path.exists(dr):
+ os.makedirs(dr)
+
+ test = tool_type_mapper[name](params, res_q.put, dr,
+ node=node.get_ip())
+ th = threading.Thread(None, test_thread, None,
+ (test, node, barrier, res_q))
+ threads.append(th)
+ th.daemon = True
+ th.start()
+
+ def gather_results(res_q, results):
+ while not res_q.empty():
+ val = res_q.get()
+
+ if isinstance(val, Exception):
+ msg = "Exception during test execution: {0}"
+ raise ValueError(msg.format(val.message))
+
+ results.append(val)
+
+ results = []
+
+ while True:
+ for th in threads:
+ th.join(1)
+ gather_results(res_q, results)
+
+ if all(not th.is_alive() for th in threads):
+ break
+
+ gather_results(res_q, results)
+ yield name, test.merge_results(results)
+
+
+def log_nodes_statistic(_, ctx):
+ nodes = ctx.nodes
+ logger.info("Found {0} nodes total".format(len(nodes)))
+ per_role = collections.defaultdict(lambda: 0)
+ for node in nodes:
+ for role in node.roles:
+ per_role[role] += 1
+
+ for role, count in sorted(per_role.items()):
+ logger.debug("Found {0} nodes with role {1}".format(count, role))
+
+
+def connect_stage(cfg, ctx):
+ ctx.clear_calls_stack.append(disconnect_stage)
+ connect_all(ctx.nodes)
+
+
+def make_undiscover_stage(clean_data):
+ def undiscover_stage(cfg, ctx):
+ undiscover(clean_data)
+ return undiscover_stage
+
+
+def discover_stage(cfg, ctx):
+ if cfg.get('discover') is not None:
+ discover_objs = [i.strip() for i in cfg['discover'].strip().split(",")]
+
+ nodes, clean_data = discover(ctx, discover_objs,
+ cfg['clouds'], cfg['var_dir'])
+ ctx.clear_calls_stack.append(make_undiscover_stage(clean_data))
+ ctx.nodes.extend(nodes)
+
+ for url, roles in cfg.get('explicit_nodes', {}).items():
+ ctx.nodes.append(Node(url, roles.split(",")))
+
+
+def deploy_sensors_stage(cfg_dict, ctx):
+ if 'sensors' not in cfg_dict:
+ return
+
+ cfg = cfg_dict.get('sensors')
+
+ sensors_configs = []
+ monitored_nodes = []
+
+ for role, sensors_str in cfg["roles_mapping"].items():
+ sensors = [sens.strip() for sens in sensors_str.split(",")]
+
+ collect_cfg = dict((sensor, {}) for sensor in sensors)
+
+ for node in ctx.nodes:
+ if role in node.roles:
+ monitored_nodes.append(node)
+ sens_cfg = SensorConfig(node.connection,
+ node.get_ip(),
+ collect_cfg)
+ sensors_configs.append(sens_cfg)
+
+ if len(monitored_nodes) == 0:
+ logger.info("Nothing to monitor, no sensors would be installed")
+ return
+
+ ctx.receiver_uri = cfg["receiver_uri"]
+ nodes_ips = [node.get_ip() for node in monitored_nodes]
+ if '{ip}' in ctx.receiver_uri:
+ ips = set(map(utils.get_ip_for_target, nodes_ips))
+
+ if len(ips) > 1:
+ raise ValueError("Can't select external ip for sensors server")
+
+ if len(ips) == 0:
+ raise ValueError("Can't find any external ip for sensors server")
+
+ ext_ip = list(ips)[0]
+ ctx.receiver_uri = ctx.receiver_uri.format(ip=ext_ip)
+
+ ctx.clear_calls_stack.append(remove_sensors_stage)
+ ctx.sensor_cm = start_monitoring(ctx.receiver_uri, sensors_configs)
+
+ ctx.sensors_control_queue = ctx.sensor_cm.__enter__()
+
+ mon_q = Queue.Queue()
+
+ fd = open(cfg_dict['sensor_storage'], "w")
+ th = threading.Thread(None, save_sensors_data, None,
+ (ctx.sensors_control_queue, mon_q, fd))
+ th.daemon = True
+ th.start()
+ ctx.sensor_listen_thread = th
+
+ nodes_ips_set = set(nodes_ips)
+ MAX_WAIT_FOR_SENSORS = 10
+ etime = time.time() + MAX_WAIT_FOR_SENSORS
+
+ msg = "Waiting at most {0}s till all {1} nodes starts report sensors data"
+ logger.debug(msg.format(MAX_WAIT_FOR_SENSORS, len(nodes_ips_set)))
+
+ # wait till all nodes start sending data
+ while len(nodes_ips_set) != 0:
+ tleft = etime - time.time()
+ try:
+ data = mon_q.get(True, tleft)
+ ip, port = data
+ except Queue.Empty:
+ msg = "Node {0} not sending any sensor data in {1}s"
+ msg = msg.format(", ".join(nodes_ips_set), MAX_WAIT_FOR_SENSORS)
+ raise RuntimeError(msg)
+
+ if ip not in nodes_ips_set:
+ logger.warning("Receive sensors from extra node: {0}".format(ip))
+
+ nodes_ips_set.remove(ip)
+
+
+def remove_sensors_stage(cfg, ctx):
+ if ctx.sensor_cm is not None:
+ ctx.sensor_cm.__exit__(None, None, None)
+
+ if ctx.sensors_control_queue is not None:
+ ctx.sensors_control_queue.put(None)
+
+ if ctx.sensor_listen_thread is not None:
+ ctx.sensor_listen_thread.join()
+
+
+def get_os_credentials(cfg, ctx, creds_type):
+ creds = None
+
+ if creds_type == 'clouds':
+ if 'openstack' in cfg['clouds']:
+ os_cfg = cfg['clouds']['openstack']
+
+ tenant = os_cfg['OS_TENANT_NAME'].strip()
+ user = os_cfg['OS_USERNAME'].strip()
+ passwd = os_cfg['OS_PASSWORD'].strip()
+ auth_url = os_cfg['OS_AUTH_URL'].strip()
+
+ elif 'fuel' in cfg['clouds'] and \
+ 'openstack_env' in cfg['clouds']['fuel']:
+ creds = ctx.fuel_openstack_creds
+
+ elif creds_type == 'ENV':
+ user, passwd, tenant, auth_url = start_vms.ostack_get_creds()
+ elif os.path.isfile(creds_type):
+ raise NotImplementedError()
+ # user, passwd, tenant, auth_url = start_vms.ostack_get_creds()
+ else:
+ msg = "Creds {0!r} isn't supported".format(creds_type)
+ raise ValueError(msg)
+
+ if creds is None:
+ creds = {'name': user,
+ 'passwd': passwd,
+ 'tenant': tenant,
+ 'auth_url': auth_url}
+
+ return creds
+
+
+def run_tests_stage(cfg, ctx):
+ ctx.results = []
+
+ if 'tests' not in cfg:
+ return
+
+ for group in cfg['tests']:
+
+ assert len(group.items()) == 1
+ key, config = group.items()[0]
+
+ if 'start_test_nodes' == key:
+ params = config['vm_params'].copy()
+ os_nodes_ids = []
+
+ os_creds_type = config['creds']
+ os_creds = get_os_credentials(cfg, ctx, os_creds_type)
+
+ start_vms.nova_connect(**os_creds)
+
+ logger.info("Preparing openstack")
+ start_vms.prepare_os_subpr(**os_creds)
+
+ new_nodes = []
+ try:
+ params['group_name'] = cfg_dict['run_uuid']
+ for new_node, node_id in start_vms.launch_vms(params):
+ new_node.roles.append('testnode')
+ ctx.nodes.append(new_node)
+ os_nodes_ids.append(node_id)
+ new_nodes.append(new_node)
+
+ store_nodes_in_log(cfg, os_nodes_ids)
+ ctx.openstack_nodes_ids = os_nodes_ids
+
+ connect_all(new_nodes)
+
+ # deploy sensors on new nodes
+ # unify this code
+ if 'sensors' in cfg:
+ sens_cfg = []
+ sensors_str = cfg["sensors"]["roles_mapping"]['testnode']
+ sensors = [sens.strip() for sens in sensors_str.split(",")]
+
+ collect_cfg = dict((sensor, {}) for sensor in sensors)
+ for node in new_nodes:
+ sens_cfg.append((node.connection, collect_cfg))
+
+ uri = cfg["sensors"]["receiver_uri"]
+ logger.debug("Installing sensors on vm's")
+ deploy_and_start_sensors(uri, None,
+ connected_config=sens_cfg)
+
+ for test_group in config.get('tests', []):
+ ctx.results.extend(run_tests(test_group, ctx.nodes))
+
+ finally:
+ if not ctx.keep_vm:
+ shut_down_vms_stage(cfg, ctx)
+
+ else:
+ ctx.results.extend(run_tests(group, ctx.nodes))
+
+
+def shut_down_vms_stage(cfg, ctx):
+ vm_ids_fname = cfg_dict['vm_ids_fname']
+ if ctx.openstack_nodes_ids is None:
+ nodes_ids = open(vm_ids_fname).read().split()
+ else:
+ nodes_ids = ctx.openstack_nodes_ids
+
+ if len(nodes_ids) != 0:
+ logger.info("Removing nodes")
+ start_vms.clear_nodes(nodes_ids)
+ logger.info("Nodes has been removed")
+
+ if os.path.exists(vm_ids_fname):
+ os.remove(vm_ids_fname)
+
+
+def store_nodes_in_log(cfg, nodes_ids):
+ with open(cfg['vm_ids_fname'], 'w') as fd:
+ fd.write("\n".join(nodes_ids))
+
+
+def clear_enviroment(cfg, ctx):
+ if os.path.exists(cfg_dict['vm_ids_fname']):
+ shut_down_vms_stage(cfg, ctx)
+
+
+def disconnect_stage(cfg, ctx):
+ ssh_utils.close_all_sessions()
+
+ for node in ctx.nodes:
+ if node.connection is not None:
+ node.connection.close()
+
+
+def yamable(data):
+ if isinstance(data, (tuple, list)):
+ return map(yamable, data)
+
+ if isinstance(data, unicode):
+ return str(data)
+
+ if isinstance(data, dict):
+ res = {}
+ for k, v in data.items():
+ res[yamable(k)] = yamable(v)
+ return res
+
+ return data
+
+
+def store_raw_results_stage(cfg, ctx):
+
+ raw_results = os.path.join(cfg_dict['var_dir'], 'raw_results.yaml')
+
+ if os.path.exists(raw_results):
+ cont = yaml.load(open(raw_results).read())
+ else:
+ cont = []
+
+ cont.extend(yamable(ctx.results))
+ raw_data = pretty_yaml.dumps(cont)
+
+ with open(raw_results, "w") as fd:
+ fd.write(raw_data)
+
+
+def console_report_stage(cfg, ctx):
+ for tp, data in ctx.results:
+ if 'io' == tp and data is not None:
+ print(IOPerfTest.format_for_console(data))
+
+
+def report_stage(cfg, ctx):
+ html_rep_fname = cfg['html_report_file']
+
+ try:
+ fuel_url = cfg['clouds']['fuel']['url']
+ except KeyError:
+ fuel_url = None
+
+ try:
+ creds = cfg['clouds']['fuel']['creds']
+ except KeyError:
+ creds = None
+
+ report.make_io_report(ctx.results, html_rep_fname, fuel_url, creds=creds)
+
+ logger.info("Html report were stored in " + html_rep_fname)
+
+ text_rep_fname = cfg_dict['text_report_file']
+ with open(text_rep_fname, "w") as fd:
+ for tp, data in ctx.results:
+ if 'io' == tp and data is not None:
+ fd.write(IOPerfTest.format_for_console(data))
+ fd.write("\n")
+ fd.flush()
+
+ logger.info("Text report were stored in " + text_rep_fname)
+
+
+def complete_log_nodes_statistic(cfg, ctx):
+ nodes = ctx.nodes
+ for node in nodes:
+ logger.debug(str(node))
+
+
+def load_data_from(var_dir):
+ def load_data_from_file(cfg, ctx):
+ raw_results = os.path.join(var_dir, 'raw_results.yaml')
+ ctx.results = yaml.load(open(raw_results).read())
+ return load_data_from_file
+
+
+def parse_args(argv):
+ descr = "Disk io performance test suite"
+ parser = argparse.ArgumentParser(prog='wally', description=descr)
+
+ parser.add_argument("-l", dest='extra_logs',
+ action='store_true', default=False,
+ help="print some extra log info")
+ parser.add_argument("-b", '--build_description',
+ type=str, default="Build info")
+ parser.add_argument("-i", '--build_id', type=str, default="id")
+ parser.add_argument("-t", '--build_type', type=str, default="GA")
+ parser.add_argument("-u", '--username', type=str, default="admin")
+ parser.add_argument("-p", '--post-process-only', metavar="VAR_DIR",
+ help="Only process data from previour run")
+ parser.add_argument("-k", '--keep-vm', action='store_true',
+ help="Don't remove test vm's", default=False)
+ parser.add_argument("config_file")
+
+ return parser.parse_args(argv[1:])
+
+
+def main(argv):
+ opts = parse_args(argv)
+
+ if opts.post_process_only is not None:
+ stages = [
+ load_data_from(opts.post_process_only),
+ console_report_stage,
+ report_stage
+ ]
+ else:
+ stages = [
+ discover_stage,
+ log_nodes_statistic,
+ connect_stage,
+ deploy_sensors_stage,
+ run_tests_stage,
+ store_raw_results_stage,
+ console_report_stage,
+ report_stage
+ ]
+
+ load_config(opts.config_file, opts.post_process_only)
+
+ if cfg_dict.get('logging', {}).get("extra_logs", False) or opts.extra_logs:
+ level = logging.DEBUG
+ else:
+ level = logging.WARNING
+
+ setup_loggers(level, cfg_dict['log_file'])
+
+ logger.info("All info would be stored into {0}".format(
+ cfg_dict['var_dir']))
+
+ ctx = Context()
+ ctx.build_meta['build_id'] = opts.build_id
+ ctx.build_meta['build_descrption'] = opts.build_description
+ ctx.build_meta['build_type'] = opts.build_type
+ ctx.build_meta['username'] = opts.username
+ ctx.keep_vm = opts.keep_vm
+
+ try:
+ for stage in stages:
+ logger.info("Start {0.__name__} stage".format(stage))
+ stage(cfg_dict, ctx)
+ except Exception as exc:
+ msg = "Exception during current stage: {0}".format(exc.message)
+ logger.error(msg)
+ finally:
+ exc, cls, tb = sys.exc_info()
+ for stage in ctx.clear_calls_stack[::-1]:
+ try:
+ logger.info("Start {0.__name__} stage".format(stage))
+ stage(cfg_dict, ctx)
+ except Exception as exc:
+ logger.exception("During {0.__name__} stage".format(stage))
+
+ if exc is not None:
+ raise exc, cls, tb
+
+ logger.info("All info stored into {0}".format(cfg_dict['var_dir']))
+ return 0
diff --git a/wally/sensors/__init__.py b/wally/sensors/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/wally/sensors/__init__.py
diff --git a/wally/sensors/api.py b/wally/sensors/api.py
new file mode 100644
index 0000000..f66bb36
--- /dev/null
+++ b/wally/sensors/api.py
@@ -0,0 +1,58 @@
+import Queue
+import threading
+from contextlib import contextmanager
+
+from .deploy_sensors import (deploy_and_start_sensors,
+ stop_and_remove_sensors)
+from .protocol import create_protocol, Timeout
+
+
+__all__ = ['Empty', 'recv_main', 'start_monitoring',
+ 'deploy_and_start_sensors', 'SensorConfig']
+
+
+Empty = Queue.Empty
+
+
+class SensorConfig(object):
+ def __init__(self, conn, url, sensors):
+ self.conn = conn
+ self.url = url
+ self.sensors = sensors
+
+
+def recv_main(proto, data_q, cmd_q):
+ while True:
+ try:
+ data_q.put(proto.recv(0.1))
+ except Timeout:
+ pass
+
+ try:
+ val = cmd_q.get(False)
+
+ if val is None:
+ return
+
+ except Queue.Empty:
+ pass
+
+
+@contextmanager
+def start_monitoring(uri, configs):
+ deploy_and_start_sensors(uri, configs)
+ try:
+ data_q = Queue.Queue()
+ cmd_q = Queue.Queue()
+ proto = create_protocol(uri, receiver=True)
+ th = threading.Thread(None, recv_main, None, (proto, data_q, cmd_q))
+ th.daemon = True
+ th.start()
+
+ try:
+ yield data_q
+ finally:
+ cmd_q.put(None)
+ th.join()
+ finally:
+ stop_and_remove_sensors(configs)
diff --git a/wally/sensors/cp_protocol.py b/wally/sensors/cp_protocol.py
new file mode 100644
index 0000000..4e96afe
--- /dev/null
+++ b/wally/sensors/cp_protocol.py
@@ -0,0 +1,227 @@
+#!/usr/bin/env python
+""" Protocol class """
+
+import re
+import zlib
+import json
+import logging
+import binascii
+
+
+logger = logging.getLogger("wally")
+
+
+# protocol contains 2 type of packet:
+# 1 - header, which contains template schema of counters
+# 2 - body, which contains only values in order as in template
+# it uses msgpack (or provided packer) for optimization
+#
+# packet has format:
+# begin_data_prefixSIZE\n\nDATAend_data_postfix
+# packet part has format:
+# SIZE\n\rDATA
+#
+# DATA use archivation
+
+
+class PacketException(Exception):
+ """ Exceptions from Packet"""
+ pass
+
+
+class Packet(object):
+ """ Class proceed packet by protocol"""
+
+ prefix = "begin_data_prefix"
+ postfix = "end_data_postfix"
+ header_prefix = "template"
+ # other fields
+ # is_begin
+ # is_end
+ # crc
+ # data
+ # data_len
+
+ def __init__(self, packer):
+ # preinit
+ self.is_begin = False
+ self.is_end = False
+ self.crc = None
+ self.data = ""
+ self.data_len = None
+ self.srv_template = None
+ self.clt_template = None
+ self.tmpl_size = 0
+ self.packer = packer
+
+ def new_packet(self, part):
+ """ New packet adding """
+ # proceed packet
+ try:
+ # get size
+ local_size_s, _, part = part.partition("\n\r")
+ local_size = int(local_size_s)
+
+ # find prefix
+ begin = part.find(self.prefix)
+ if begin != -1:
+ # divide data if something before begin and prefix
+ from_i = begin + len(self.prefix)
+ part = part[from_i:]
+ # reset flags
+ self.is_begin = True
+ self.is_end = False
+ self.data = ""
+ # get size
+ data_len_s, _, part = part.partition("\n\r")
+ self.data_len = int(data_len_s)
+ # get crc
+ crc_s, _, part = part.partition("\n\r")
+ self.crc = int(crc_s)
+
+ # bad size?
+ if local_size != self.data_len:
+ raise PacketException("Part size error")
+
+ # find postfix
+ end = part.find(self.postfix)
+ if end != -1:
+ # divide postfix
+ part = part[:end]
+ self.is_end = True
+
+ self.data += part
+ # check if it is end
+ if self.is_end:
+ self.data = zlib.decompress(self.data)
+ if self.data_len != len(self.data):
+ raise PacketException("Total size error")
+ if binascii.crc32(self.data) != self.crc:
+ raise PacketException("CRC error")
+
+ # check, if it is template
+ if self.data.startswith(self.header_prefix):
+ self.srv_template = self.data
+ # template is for internal use
+ return None
+
+ # decode values list
+ vals = self.packer.unpack(self.data)
+ dump = self.srv_template % tuple(vals)
+ return dump
+ else:
+ return None
+
+ except PacketException as e:
+ # if something wrong - skip packet
+ logger.warning("Packet skipped: %s", e)
+ self.is_begin = False
+ self.is_end = False
+ return None
+
+ except TypeError:
+ # if something wrong - skip packet
+ logger.warning("Packet skipped: doesn't match schema")
+ self.is_begin = False
+ self.is_end = False
+ return None
+
+ except:
+ # if something at all wrong - skip packet
+ logger.warning("Packet skipped: something is wrong")
+ self.is_begin = False
+ self.is_end = False
+ return None
+
+ @staticmethod
+ def create_packet(data, part_size):
+ """ Create packet divided by parts with part_size from data
+ No compression here """
+ # prepare data
+ data_len = "%i\n\r" % len(data)
+ header = "%s%s%s\n\r" % (Packet.prefix, data_len, binascii.crc32(data))
+ compact_data = zlib.compress(data)
+ packet = "%s%s%s" % (header, compact_data, Packet.postfix)
+
+ partheader_len = len(data_len)
+
+ beg = 0
+ end = part_size - partheader_len
+
+ result = []
+ while beg < len(packet):
+ block = packet[beg:beg+end]
+ result.append(data_len + block)
+ beg += end
+
+ return result
+
+ def create_packet_v2(self, data, part_size):
+ """ Create packet divided by parts with part_size from data
+ Compressed """
+ result = []
+ # create and add to result template header
+ if self.srv_template is None:
+ perf_string = json.dumps(data)
+ self.create_answer_template(perf_string)
+ template = self.header_prefix + self.srv_template
+ header = Packet.create_packet(template, part_size)
+ result.extend(header)
+
+ vals = self.get_matching_value_list(data)
+ body = self.packer.pack(vals)
+ parts = Packet.create_packet(body, part_size)
+ result.extend(parts)
+ return result
+
+ def get_matching_value_list(self, data):
+ """ Get values in order server expect"""
+ vals = range(0, self.tmpl_size)
+
+ try:
+ for node, groups in self.clt_template.items():
+ for group, counters in groups.items():
+ for counter, index in counters.items():
+ if not isinstance(index, dict):
+ vals[index] = data[node][group][counter]
+ else:
+ for k, i in index.items():
+ vals[i] = data[node][group][counter][k]
+
+ return vals
+
+ except (IndexError, KeyError):
+ logger = logging.getLogger(__name__)
+ logger.error("Data don't match last schema")
+ raise PacketException("Data don't match last schema")
+
+ def create_answer_template(self, perf_string):
+ """ Create template for server to insert counter values
+ Return tuple of server and clien templates + number of replaces"""
+ replacer = re.compile(": [0-9]+\.?[0-9]*")
+ # replace all values by %s
+ finditer = replacer.finditer(perf_string)
+ # server not need know positions
+ self.srv_template = ""
+ # client need positions
+ clt_template = ""
+ beg = 0
+ k = 0
+ # this could be done better?
+ for match in finditer:
+ # define input place in server template
+ self.srv_template += perf_string[beg:match.start()]
+ self.srv_template += ": %s"
+ # define match number in client template
+ clt_template += perf_string[beg:match.start()]
+ clt_template += ": %i" % k
+
+ beg = match.end()
+ k += 1
+
+ # add tail
+ self.srv_template += perf_string[beg:]
+ clt_template += perf_string[beg:]
+
+ self.tmpl_size = k
+ self.clt_template = json.loads(clt_template)
diff --git a/wally/sensors/cp_transport.py b/wally/sensors/cp_transport.py
new file mode 100644
index 0000000..2e00e80
--- /dev/null
+++ b/wally/sensors/cp_transport.py
@@ -0,0 +1,157 @@
+#!/usr/bin/env python
+""" UDP sender class """
+
+import socket
+import urlparse
+
+from cp_protocol import Packet
+
+try:
+ from disk_perf_test_tool.logger import define_logger
+ logger = define_logger(__name__)
+except ImportError:
+ class Logger(object):
+ def debug(self, *dt):
+ pass
+ logger = Logger()
+
+
+class SenderException(Exception):
+ """ Exceptions in Sender class """
+ pass
+
+
+class Timeout(Exception):
+ """ Exceptions in Sender class """
+ pass
+
+
+class Sender(object):
+ """ UDP sender class """
+
+ def __init__(self, packer, url=None, port=None, host="0.0.0.0", size=256):
+ """ Create connection object from input udp string or params"""
+
+ # test input
+ if url is None and port is None:
+ raise SenderException("Bad initialization")
+ if url is not None:
+ data = urlparse.urlparse(url)
+ # check schema
+ if data.scheme != "udp":
+ mes = "Bad protocol type: %s instead of UDP" % data.scheme
+ logger.error(mes)
+ raise SenderException("Bad protocol type")
+ # try to get port
+ try:
+ int_port = int(data.port)
+ except ValueError:
+ logger.error("Bad UDP port")
+ raise SenderException("Bad UDP port")
+ # save paths
+ self.sendto = (data.hostname, int_port)
+ self.bindto = (data.hostname, int_port)
+ # try to get size
+ try:
+ self.size = int(data.path.strip("/"))
+ except ValueError:
+ logger.error("Bad packet part size")
+ raise SenderException("Bad packet part size")
+ else:
+ # url is None - use size and port
+ self.sendto = (host, port)
+ self.bindto = ("0.0.0.0", port)
+ self.size = size
+
+ self.packer = packer
+
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ self.binded = False
+ self.all_data = {}
+ self.send_packer = None
+
+ def bind(self):
+ """ Prepare for listening """
+ self.sock.bind(self.bindto)
+ self.sock.settimeout(0.5)
+ self.binded = True
+
+ def send(self, data):
+ """ Send data to udp socket"""
+ if self.sock.sendto(data, self.sendto) != len(data):
+ mes = "Cannot send data to %s:%s" % self.sendto
+ logger.error(mes)
+ raise SenderException("Cannot send data")
+
+ def send_by_protocol(self, data):
+ """ Send data by Packet protocol
+ data = dict"""
+ if self.send_packer is None:
+ self.send_packer = Packet(self.packer())
+ parts = self.send_packer.create_packet_v2(data, self.size)
+ for part in parts:
+ self.send(part)
+
+ def recv(self):
+ """ Receive data from udp socket"""
+ # check for binding
+ if not self.binded:
+ self.bind()
+ # try to recv
+ try:
+ data, (remote_ip, _) = self.sock.recvfrom(self.size)
+ return data, remote_ip
+ except socket.timeout:
+ raise Timeout()
+
+ def recv_by_protocol(self):
+ """ Receive data from udp socket by Packet protocol"""
+ data, remote_ip = self.recv()
+
+ if remote_ip not in self.all_data:
+ self.all_data[remote_ip] = Packet(self.packer())
+
+ return self.all_data[remote_ip].new_packet(data)
+
+ def recv_with_answer(self, stop_event=None):
+ """ Receive data from udp socket and send 'ok' back
+ Command port = local port + 1
+ Answer port = local port
+ Waiting for command is blocking """
+ # create command socket
+ command_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ command_port = self.bindto[1]+1
+ command_sock.bind(("0.0.0.0", command_port))
+ command_sock.settimeout(1)
+ # try to recv
+ while True:
+ try:
+ data, (remote_ip, _) = command_sock.recvfrom(self.size)
+ self.send("ok")
+ return data, remote_ip
+ except socket.timeout:
+ if stop_event is not None and stop_event.is_set():
+ # return None if we are interrupted
+ return None
+
+ def verified_send(self, send_host, message, max_repeat=20):
+ """ Send and verify it by answer not more then max_repeat
+ Send port = local port + 1
+ Answer port = local port
+ Return True if send is verified """
+ # create send socket
+ send_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ send_port = self.sendto[1]+1
+ for repeat in range(0, max_repeat):
+ send_sock.sendto(message, (send_host, send_port))
+ try:
+ data, remote_ip = self.recv()
+ if remote_ip == send_host and data == "ok":
+ return True
+ else:
+ logger.warning("No answer from %s, try %i",
+ send_host, repeat)
+ except Timeout:
+ logger.warning("No answer from %s, try %i", send_host, repeat)
+
+ return False
diff --git a/wally/sensors/daemonize.py b/wally/sensors/daemonize.py
new file mode 100644
index 0000000..a4fa157
--- /dev/null
+++ b/wally/sensors/daemonize.py
@@ -0,0 +1,226 @@
+# #!/usr/bin/python
+
+import fcntl
+import os
+import pwd
+import grp
+import sys
+import signal
+import resource
+import logging
+import atexit
+from logging import handlers
+
+
+class Daemonize(object):
+ """ Daemonize object
+ Object constructor expects three arguments:
+ - app: contains the application name which will be sent to syslog.
+ - pid: path to the pidfile.
+ - action: your custom function which will be executed after daemonization.
+ - keep_fds: optional list of fds which should not be closed.
+ - auto_close_fds: optional parameter to not close opened fds.
+ - privileged_action: action that will be executed before
+ drop privileges if user or
+ group parameter is provided.
+ If you want to transfer anything from privileged
+ action to action, such as opened privileged file
+ descriptor, you should return it from
+ privileged_action function and catch it inside action
+ function.
+ - user: drop privileges to this user if provided.
+ - group: drop privileges to this group if provided.
+ - verbose: send debug messages to logger if provided.
+ - logger: use this logger object instead of creating new one, if provided.
+ """
+ def __init__(self, app, pid, action, keep_fds=None, auto_close_fds=True,
+ privileged_action=None, user=None, group=None, verbose=False,
+ logger=None):
+ self.app = app
+ self.pid = pid
+ self.action = action
+ self.keep_fds = keep_fds or []
+ self.privileged_action = privileged_action or (lambda: ())
+ self.user = user
+ self.group = group
+ self.logger = logger
+ self.verbose = verbose
+ self.auto_close_fds = auto_close_fds
+
+ def sigterm(self, signum, frame):
+ """ sigterm method
+ These actions will be done after SIGTERM.
+ """
+ self.logger.warn("Caught signal %s. Stopping daemon." % signum)
+ os.remove(self.pid)
+ sys.exit(0)
+
+ def exit(self):
+ """ exit method
+ Cleanup pid file at exit.
+ """
+ self.logger.warn("Stopping daemon.")
+ os.remove(self.pid)
+ sys.exit(0)
+
+ def start(self):
+ """ start method
+ Main daemonization process.
+ """
+ # If pidfile already exists, we should read pid from there;
+ # to overwrite it, if locking
+ # will fail, because locking attempt somehow purges the file contents.
+ if os.path.isfile(self.pid):
+ with open(self.pid, "r") as old_pidfile:
+ old_pid = old_pidfile.read()
+ # Create a lockfile so that only one instance of this daemon is
+ # running at any time.
+ try:
+ lockfile = open(self.pid, "w")
+ except IOError:
+ print("Unable to create the pidfile.")
+ sys.exit(1)
+ try:
+ # Try to get an exclusive lock on the file. This will fail if
+ # another process has the file
+ # locked.
+ fcntl.flock(lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
+ except IOError:
+ print("Unable to lock on the pidfile.")
+ # We need to overwrite the pidfile if we got here.
+ with open(self.pid, "w") as pidfile:
+ pidfile.write(old_pid)
+ sys.exit(1)
+
+ # Fork, creating a new process for the child.
+ process_id = os.fork()
+ if process_id < 0:
+ # Fork error. Exit badly.
+ sys.exit(1)
+ elif process_id != 0:
+ # This is the parent process. Exit.
+ sys.exit(0)
+ # This is the child process. Continue.
+
+ # Stop listening for signals that the parent process receives.
+ # This is done by getting a new process id.
+ # setpgrp() is an alternative to setsid().
+ # setsid puts the process in a new parent group and detaches
+ # its controlling terminal.
+ process_id = os.setsid()
+ if process_id == -1:
+ # Uh oh, there was a problem.
+ sys.exit(1)
+
+ # Add lockfile to self.keep_fds.
+ self.keep_fds.append(lockfile.fileno())
+
+ # Close all file descriptors, except the ones mentioned in
+ # self.keep_fds.
+ devnull = "/dev/null"
+ if hasattr(os, "devnull"):
+ # Python has set os.devnull on this system, use it instead as it
+ # might be different
+ # than /dev/null.
+ devnull = os.devnull
+
+ if self.auto_close_fds:
+ for fd in range(3, resource.getrlimit(resource.RLIMIT_NOFILE)[0]):
+ if fd not in self.keep_fds:
+ try:
+ os.close(fd)
+ except OSError:
+ pass
+
+ devnull_fd = os.open(devnull, os.O_RDWR)
+ os.dup2(devnull_fd, 0)
+ os.dup2(devnull_fd, 1)
+ os.dup2(devnull_fd, 2)
+
+ if self.logger is None:
+ # Initialize logging.
+ self.logger = logging.getLogger(self.app)
+ self.logger.setLevel(logging.DEBUG)
+ # Display log messages only on defined handlers.
+ self.logger.propagate = False
+
+ # Initialize syslog.
+ # It will correctly work on OS X, Linux and FreeBSD.
+ if sys.platform == "darwin":
+ syslog_address = "/var/run/syslog"
+ else:
+ syslog_address = "/dev/log"
+
+ # We will continue with syslog initialization only if
+ # actually have such capabilities
+ # on the machine we are running this.
+ if os.path.isfile(syslog_address):
+ syslog = handlers.SysLogHandler(syslog_address)
+ if self.verbose:
+ syslog.setLevel(logging.DEBUG)
+ else:
+ syslog.setLevel(logging.INFO)
+ # Try to mimic to normal syslog messages.
+ format_t = "%(asctime)s %(name)s: %(message)s"
+ formatter = logging.Formatter(format_t,
+ "%b %e %H:%M:%S")
+ syslog.setFormatter(formatter)
+
+ self.logger.addHandler(syslog)
+
+ # Set umask to default to safe file permissions when running
+ # as a root daemon. 027 is an
+ # octal number which we are typing as 0o27 for Python3 compatibility.
+ os.umask(0o27)
+
+ # Change to a known directory. If this isn't done, starting a daemon
+ # in a subdirectory that
+ # needs to be deleted results in "directory busy" errors.
+ os.chdir("/")
+
+ # Execute privileged action
+ privileged_action_result = self.privileged_action()
+ if not privileged_action_result:
+ privileged_action_result = []
+
+ # Change gid
+ if self.group:
+ try:
+ gid = grp.getgrnam(self.group).gr_gid
+ except KeyError:
+ self.logger.error("Group {0} not found".format(self.group))
+ sys.exit(1)
+ try:
+ os.setgid(gid)
+ except OSError:
+ self.logger.error("Unable to change gid.")
+ sys.exit(1)
+
+ # Change uid
+ if self.user:
+ try:
+ uid = pwd.getpwnam(self.user).pw_uid
+ except KeyError:
+ self.logger.error("User {0} not found.".format(self.user))
+ sys.exit(1)
+ try:
+ os.setuid(uid)
+ except OSError:
+ self.logger.error("Unable to change uid.")
+ sys.exit(1)
+
+ try:
+ lockfile.write("%s" % (os.getpid()))
+ lockfile.flush()
+ except IOError:
+ self.logger.error("Unable to write pid to the pidfile.")
+ print("Unable to write pid to the pidfile.")
+ sys.exit(1)
+
+ # Set custom action on SIGTERM.
+ signal.signal(signal.SIGTERM, self.sigterm)
+ atexit.register(self.exit)
+
+ self.logger.warn("Starting daemon.")
+
+ self.action(*privileged_action_result)
diff --git a/wally/sensors/deploy_sensors.py b/wally/sensors/deploy_sensors.py
new file mode 100644
index 0000000..249adfb
--- /dev/null
+++ b/wally/sensors/deploy_sensors.py
@@ -0,0 +1,87 @@
+import time
+import json
+import os.path
+import logging
+
+from concurrent.futures import ThreadPoolExecutor, wait
+
+from wally.ssh_utils import copy_paths, run_over_ssh
+
+logger = logging.getLogger('wally')
+
+
+def wait_all_ok(futures):
+ return all(future.result() for future in futures)
+
+
+def deploy_and_start_sensors(monitor_uri, sensor_configs,
+ remote_path='/tmp/sensors/sensors'):
+
+ paths = {os.path.dirname(__file__): remote_path}
+ with ThreadPoolExecutor(max_workers=32) as executor:
+ futures = []
+
+ for node_sensor_config in sensor_configs:
+ futures.append(executor.submit(deploy_and_start_sensor,
+ paths,
+ node_sensor_config,
+ monitor_uri,
+ remote_path))
+
+ if not wait_all_ok(futures):
+ raise RuntimeError("Sensor deployment fails on some nodes")
+
+
+def deploy_and_start_sensor(paths, node_sensor_config,
+ monitor_uri, remote_path):
+ try:
+ copy_paths(node_sensor_config.conn, paths)
+ sftp = node_sensor_config.conn.open_sftp()
+
+ config_remote_path = os.path.join(remote_path, "conf.json")
+
+ with sftp.open(config_remote_path, "w") as fd:
+ fd.write(json.dumps(node_sensor_config.sensors))
+
+ cmd_templ = 'env PYTHONPATH="{0}" python -m ' + \
+ "sensors.main -d start -u {1} {2}"
+
+ cmd = cmd_templ.format(os.path.dirname(remote_path),
+ monitor_uri,
+ config_remote_path)
+
+ run_over_ssh(node_sensor_config.conn, cmd,
+ node=node_sensor_config.url)
+ sftp.close()
+
+ except:
+ msg = "During deploing sensors in {0}".format(node_sensor_config.url)
+ logger.exception(msg)
+ return False
+ return True
+
+
+def stop_and_remove_sensor(conn, url, remote_path):
+ cmd = 'env PYTHONPATH="{0}" python -m sensors.main -d stop'
+
+ run_over_ssh(conn, cmd.format(remote_path), node=url)
+
+ # some magic
+ time.sleep(0.3)
+
+ conn.exec_command("rm -rf {0}".format(remote_path))
+
+ logger.debug("Sensors stopped and removed")
+
+
+def stop_and_remove_sensors(configs, remote_path='/tmp/sensors'):
+ with ThreadPoolExecutor(max_workers=32) as executor:
+ futures = []
+
+ for node_sensor_config in configs:
+ futures.append(executor.submit(stop_and_remove_sensor,
+ node_sensor_config.conn,
+ node_sensor_config.url,
+ remote_path))
+
+ wait(futures)
diff --git a/wally/sensors/discover.py b/wally/sensors/discover.py
new file mode 100644
index 0000000..f227043
--- /dev/null
+++ b/wally/sensors/discover.py
@@ -0,0 +1,9 @@
+all_sensors = {}
+
+
+def provides(sensor_class_name):
+ def closure(func):
+ assert sensor_class_name not in all_sensors
+ all_sensors[sensor_class_name] = func
+ return func
+ return closure
diff --git a/wally/sensors/main.py b/wally/sensors/main.py
new file mode 100644
index 0000000..3753e7c
--- /dev/null
+++ b/wally/sensors/main.py
@@ -0,0 +1,118 @@
+import os
+import sys
+import time
+import json
+import glob
+import signal
+import os.path
+import argparse
+
+from .sensors.utils import SensorInfo
+from .daemonize import Daemonize
+from .discover import all_sensors
+from .protocol import create_protocol
+
+
+# load all sensors
+from . import sensors
+sensors_dir = os.path.dirname(sensors.__file__)
+for fname in glob.glob(os.path.join(sensors_dir, "*.py")):
+ mod_name = os.path.basename(fname[:-3])
+ __import__("sensors.sensors." + mod_name)
+
+
+def get_values(required_sensors):
+ result = {}
+ for sensor_name, params in required_sensors:
+ if sensor_name in all_sensors:
+ result.update(all_sensors[sensor_name](**params))
+ else:
+ msg = "Sensor {0!r} isn't available".format(sensor_name)
+ raise ValueError(msg)
+ return time.time(), result
+
+
+def parse_args(args):
+ parser = argparse.ArgumentParser()
+ parser.add_argument('-d', '--daemon',
+ choices=('start', 'stop', 'status'),
+ default=None)
+
+ parser.add_argument('-u', '--url', default='stdout://')
+ parser.add_argument('-t', '--timeout', type=float, default=1)
+ parser.add_argument('-l', '--list-sensors', action='store_true')
+ parser.add_argument('sensors_config', type=argparse.FileType('r'),
+ default=None, nargs='?')
+ return parser.parse_args(args[1:])
+
+
+def daemon_main(required_sensors, opts):
+ sender = create_protocol(opts.url)
+ prev = {}
+
+ while True:
+ gtime, data = get_values(required_sensors.items())
+ curr = {'time': SensorInfo(gtime, True)}
+ for name, val in data.items():
+ if val.is_accumulated:
+ if name in prev:
+ curr[name] = SensorInfo(val.value - prev[name], True)
+ prev[name] = val.value
+ else:
+ curr[name] = SensorInfo(val.value, False)
+ sender.send(curr)
+ time.sleep(opts.timeout)
+
+
+def pid_running(pid):
+ return os.path.exists("/proc/" + str(pid))
+
+
+def main(argv):
+ opts = parse_args(argv)
+
+ if opts.list_sensors:
+ print "\n".join(sorted(all_sensors))
+ return 0
+
+ if opts.daemon is not None:
+ pid_file = "/tmp/sensors.pid"
+ if opts.daemon == 'start':
+ required_sensors = json.loads(opts.sensors_config.read())
+
+ def root_func():
+ daemon_main(required_sensors, opts)
+
+ daemon = Daemonize(app="perfcollect_app",
+ pid=pid_file,
+ action=root_func)
+ daemon.start()
+ elif opts.daemon == 'stop':
+ if os.path.isfile(pid_file):
+ pid = int(open(pid_file).read())
+ if pid_running(pid):
+ os.kill(pid, signal.SIGTERM)
+
+ time.sleep(0.1)
+
+ if pid_running(pid):
+ os.kill(pid, signal.SIGKILL)
+
+ if os.path.isfile(pid_file):
+ os.unlink(pid_file)
+ elif opts.daemon == 'status':
+ if os.path.isfile(pid_file):
+ pid = int(open(pid_file).read())
+ if pid_running(pid):
+ print "running"
+ return
+ print "stopped"
+ else:
+ raise ValueError("Unknown daemon operation {}".format(opts.daemon))
+ else:
+ required_sensors = json.loads(opts.sensors_config.read())
+ daemon_main(required_sensors, opts)
+ return 0
+
+if __name__ == "__main__":
+ exit(main(sys.argv))
diff --git a/wally/sensors/protocol.py b/wally/sensors/protocol.py
new file mode 100644
index 0000000..c2ace01
--- /dev/null
+++ b/wally/sensors/protocol.py
@@ -0,0 +1,192 @@
+import sys
+import time
+import socket
+import select
+import cPickle as pickle
+from urlparse import urlparse
+
+from . import cp_transport
+
+
+class Timeout(Exception):
+ pass
+
+
+# ------------------------------------- Serializers --------------------------
+
+
+class ISensortResultsSerializer(object):
+ def pack(self, data):
+ pass
+
+ def unpack(self, data):
+ pass
+
+
+class PickleSerializer(ISensortResultsSerializer):
+ def pack(self, data):
+ ndata = {key: val.value for key, val in data.items()}
+ return pickle.dumps(ndata)
+
+ def unpack(self, data):
+ return pickle.loads(data)
+
+try:
+ # try to use full-function lib
+ import msgpack
+
+ class mgspackSerializer(ISensortResultsSerializer):
+ def pack(self, data):
+ return msgpack.packb(data)
+
+ def unpack(self, data):
+ return msgpack.unpackb(data)
+
+ MSGPackSerializer = mgspackSerializer
+except ImportError:
+ # use local lib, if failed import
+ import umsgpack
+
+ class umsgspackSerializer(ISensortResultsSerializer):
+ def pack(self, data):
+ return umsgpack.packb(data)
+
+ def unpack(self, data):
+ return umsgpack.unpackb(data)
+
+ MSGPackSerializer = umsgspackSerializer
+
+# ------------------------------------- Transports ---------------------------
+
+
+class ITransport(object):
+ def __init__(self, receiver):
+ pass
+
+ def send(self, data):
+ pass
+
+ def recv(self, timeout=None):
+ pass
+
+
+class StdoutTransport(ITransport):
+ MIN_COL_WIDTH = 10
+
+ def __init__(self, receiver, delta=True):
+ if receiver:
+ cname = self.__class__.__name__
+ raise ValueError("{0} don't allows receiving".format(cname))
+
+ self.headers = None
+ self.line_format = ""
+ self.prev = {}
+ self.delta = delta
+ self.fd = sys.stdout
+
+ def send(self, data):
+ if self.headers is None:
+ self.headers = sorted(data)
+
+ for pos, header in enumerate(self.headers):
+ self.line_format += "{%s:>%s}" % (pos,
+ max(len(header) + 1,
+ self.MIN_COL_WIDTH))
+
+ print self.line_format.format(*self.headers)
+
+ if self.delta:
+ vals = [data[header].value - self.prev.get(header, 0)
+ for header in self.headers]
+
+ self.prev.update({header: data[header].value
+ for header in self.headers})
+ else:
+ vals = [data[header].value for header in self.headers]
+
+ self.fd.write(self.line_format.format(*vals) + "\n")
+
+ def recv(self, timeout=None):
+ cname = self.__class__.__name__
+ raise ValueError("{0} don't allows receiving".format(cname))
+
+
+class FileTransport(StdoutTransport):
+ def __init__(self, receiver, fname, delta=True):
+ StdoutTransport.__init__(self, receiver, delta)
+ self.fd = open(fname, "w")
+
+
+class UDPTransport(ITransport):
+ def __init__(self, receiver, ip, port, packer_cls):
+ self.port = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ if receiver:
+ self.port.bind((ip, port))
+ self.packer_cls = packer_cls
+ self.packers = {}
+ else:
+ self.packer = packer_cls()
+ self.dst = (ip, port)
+
+ def send(self, data):
+ raw_data = self.packer.pack(data)
+ self.port.sendto(raw_data, self.dst)
+
+ def recv(self, timeout=None):
+ r, _, _ = select.select([self.port], [], [], timeout)
+ if len(r) != 0:
+ raw_data, addr = self.port.recvfrom(10000)
+ packer = self.packers.setdefault(addr, self.packer_cls())
+ return addr, packer.unpack(raw_data)
+ else:
+ raise Timeout()
+
+
+class HugeUDPTransport(ITransport, cp_transport.Sender):
+ def __init__(self, receiver, ip, port, packer_cls):
+ cp_transport.Sender.__init__(self, port=port, host=ip)
+ if receiver:
+ self.bind()
+
+ def send(self, data):
+ self.send_by_protocol(data)
+
+ def recv(self, timeout=None):
+ begin = time.time()
+
+ while True:
+
+ try:
+ # return not None, if packet is ready
+ ready = self.recv_by_protocol()
+ # if data ready - return it
+ if ready is not None:
+ return ready
+ # if data not ready - check if it's time to die
+ if time.time() - begin >= timeout:
+ break
+
+ except cp_transport.Timeout:
+ # no answer yet - check, if timeout end
+ if time.time() - begin >= timeout:
+ break
+# -------------------------- Factory function --------------------------------
+
+
+def create_protocol(uri, receiver=False):
+ parsed_uri = urlparse(uri)
+ if parsed_uri.scheme == 'stdout':
+ return StdoutTransport(receiver)
+ elif parsed_uri.scheme == 'udp':
+ ip, port = parsed_uri.netloc.split(":")
+ return UDPTransport(receiver, ip=ip, port=int(port),
+ packer_cls=PickleSerializer)
+ elif parsed_uri.scheme == 'file':
+ return FileTransport(receiver, parsed_uri.path)
+ elif parsed_uri.scheme == 'hugeudp':
+ ip, port = parsed_uri.netloc.split(":")
+ return HugeUDPTransport(receiver, ip=ip, port=int(port),
+ packer_cls=MSGPackSerializer)
+ else:
+ templ = "Can't instantiate transport from {0!r}"
+ raise ValueError(templ.format(uri))
diff --git a/wally/sensors/receiver.py b/wally/sensors/receiver.py
new file mode 100644
index 0000000..ff0f223
--- /dev/null
+++ b/wally/sensors/receiver.py
@@ -0,0 +1,19 @@
+from .api import start_monitoring, Empty
+# from influx_exporter import connect, add_data
+
+uri = "udp://192.168.0.104:12001"
+# infldb_url = "influxdb://perf:perf@192.168.152.42:8086/perf"
+# conn = connect(infldb_url)
+
+monitor_config = {'127.0.0.1':
+ {"block-io": {'allowed_prefixes': ['sda1', 'rbd1']},
+ "net-io": {"allowed_prefixes": ["virbr2"]}}}
+
+with start_monitoring(uri, monitor_config) as queue:
+ while True:
+ try:
+ (ip, port), data = queue.get(True, 1)
+ print (ip, port), data
+ # add_data(conn, ip, [data])
+ except Empty:
+ pass
diff --git a/wally/sensors/sensors/__init__.py b/wally/sensors/sensors/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/wally/sensors/sensors/__init__.py
diff --git a/wally/sensors/sensors/io_sensors.py b/wally/sensors/sensors/io_sensors.py
new file mode 100644
index 0000000..c9ff340
--- /dev/null
+++ b/wally/sensors/sensors/io_sensors.py
@@ -0,0 +1,72 @@
+from ..discover import provides
+from .utils import SensorInfo, is_dev_accepted
+
+# 1 - major number
+# 2 - minor mumber
+# 3 - device name
+# 4 - reads completed successfully
+# 5 - reads merged
+# 6 - sectors read
+# 7 - time spent reading (ms)
+# 8 - writes completed
+# 9 - writes merged
+# 10 - sectors written
+# 11 - time spent writing (ms)
+# 12 - I/Os currently in progress
+# 13 - time spent doing I/Os (ms)
+# 14 - weighted time spent doing I/Os (ms)
+
+io_values_pos = [
+ (3, 'reads_completed', True),
+ (5, 'sectors_read', True),
+ (6, 'rtime', True),
+ (7, 'writes_completed', True),
+ (9, 'sectors_written', True),
+ (10, 'wtime', True),
+ (11, 'io_queue', False),
+ (13, 'io_time', True)
+]
+
+
+@provides("block-io")
+def io_stat(disallowed_prefixes=('ram', 'loop'), allowed_prefixes=None):
+ results = {}
+ for line in open('/proc/diskstats'):
+ vals = line.split()
+ dev_name = vals[2]
+
+ dev_ok = is_dev_accepted(dev_name,
+ disallowed_prefixes,
+ allowed_prefixes)
+
+ if dev_ok:
+ for pos, name, accum_val in io_values_pos:
+ sensor_name = "{0}.{1}".format(dev_name, name)
+ results[sensor_name] = SensorInfo(int(vals[pos]), accum_val)
+ return results
+
+
+def get_latency(stat1, stat2):
+ disks = set(i.split('.', 1)[0] for i in stat1)
+ results = {}
+
+ for disk in disks:
+ rdc = disk + '.reads_completed'
+ wrc = disk + '.writes_completed'
+ rdt = disk + '.rtime'
+ wrt = disk + '.wtime'
+ lat = 0.0
+
+ io_ops1 = stat1[rdc].value + stat1[wrc].value
+ io_ops2 = stat2[rdc].value + stat2[wrc].value
+
+ diops = io_ops2 - io_ops1
+
+ if diops != 0:
+ io1 = stat1[rdt].value + stat1[wrt].value
+ io2 = stat2[rdt].value + stat2[wrt].value
+ lat = abs(float(io1 - io2)) / diops
+
+ results[disk + '.latence'] = SensorInfo(lat, False)
+
+ return results
diff --git a/wally/sensors/sensors/net_sensors.py b/wally/sensors/sensors/net_sensors.py
new file mode 100644
index 0000000..4a4e477
--- /dev/null
+++ b/wally/sensors/sensors/net_sensors.py
@@ -0,0 +1,43 @@
+from ..discover import provides
+from .utils import SensorInfo, is_dev_accepted
+
+# 1 - major number
+# 2 - minor mumber
+# 3 - device name
+# 4 - reads completed successfully
+# 5 - reads merged
+# 6 - sectors read
+# 7 - time spent reading (ms)
+# 8 - writes completed
+# 9 - writes merged
+# 10 - sectors written
+# 11 - time spent writing (ms)
+# 12 - I/Os currently in progress
+# 13 - time spent doing I/Os (ms)
+# 14 - weighted time spent doing I/Os (ms)
+
+net_values_pos = [
+ (0, 'recv_bytes', True),
+ (1, 'recv_packets', True),
+ (8, 'send_bytes', True),
+ (9, 'send_packets', True),
+]
+
+
+@provides("net-io")
+def net_stat(disallowed_prefixes=('docker',), allowed_prefixes=None):
+ results = {}
+
+ for line in open('/proc/net/dev').readlines()[2:]:
+ dev_name, stats = line.split(":", 1)
+ dev_name = dev_name.strip()
+ vals = stats.split()
+
+ dev_ok = is_dev_accepted(dev_name,
+ disallowed_prefixes,
+ allowed_prefixes)
+ if dev_ok:
+ for pos, name, accum_val in net_values_pos:
+ sensor_name = "{0}.{1}".format(dev_name, name)
+ results[sensor_name] = SensorInfo(int(vals[pos]), accum_val)
+ return results
diff --git a/wally/sensors/sensors/pscpu_sensors.py b/wally/sensors/sensors/pscpu_sensors.py
new file mode 100644
index 0000000..cffdb71
--- /dev/null
+++ b/wally/sensors/sensors/pscpu_sensors.py
@@ -0,0 +1,37 @@
+import os
+
+from ..discover import provides
+from .utils import SensorInfo, get_pid_name, get_pid_list
+
+
+@provides("perprocess-cpu")
+def pscpu_stat(disallowed_prefixes=None, allowed_prefixes=None):
+ results = {}
+ pid_list = get_pid_list(disallowed_prefixes, allowed_prefixes)
+
+ for pid in pid_list:
+ try:
+ dev_name = get_pid_name(pid)
+
+ pid_stat1 = pid_stat(pid)
+
+ sensor_name = "{0}.{1}".format(dev_name, pid)
+ results[sensor_name] = SensorInfo(pid_stat1, True)
+ except IOError:
+ # may be, proc has already terminated, skip it
+ continue
+ return results
+
+
+def pid_stat(pid):
+ """ Return total cpu usage time from process"""
+ # read /proc/pid/stat
+ with open(os.path.join('/proc/', pid, 'stat'), 'r') as pidfile:
+ proctimes = pidfile.readline().split()
+ # get utime from /proc/<pid>/stat, 14 item
+ utime = proctimes[13]
+ # get stime from proc/<pid>/stat, 15 item
+ stime = proctimes[14]
+ # count total process used time
+ proctotal = int(utime) + int(stime)
+ return float(proctotal)
diff --git a/wally/sensors/sensors/psram_sensors.py b/wally/sensors/sensors/psram_sensors.py
new file mode 100644
index 0000000..cbd85e6
--- /dev/null
+++ b/wally/sensors/sensors/psram_sensors.py
@@ -0,0 +1,76 @@
+from ..discover import provides
+from .utils import SensorInfo, get_pid_name, get_pid_list
+
+
+# Based on ps_mem.py:
+# Licence: LGPLv2
+# Author: P@draigBrady.com
+# Source: http://www.pixelbeat.org/scripts/ps_mem.py
+# http://github.com/pixelb/scripts/commits/master/scripts/ps_mem.py
+
+
+# Note shared is always a subset of rss (trs is not always)
+def get_mem_stats(pid):
+ """ Return memory data of pid in format (private, shared) """
+
+ fname = '/proc/{0}/{1}'.format(pid, "smaps")
+ lines = open(fname).readlines()
+
+ shared = 0
+ private = 0
+ pss = 0
+
+ # add 0.5KiB as this avg error due to trunctation
+ pss_adjust = 0.5
+
+ for line in lines:
+ if line.startswith("Shared"):
+ shared += int(line.split()[1])
+
+ if line.startswith("Private"):
+ private += int(line.split()[1])
+
+ if line.startswith("Pss"):
+ pss += float(line.split()[1]) + pss_adjust
+
+ # Note Shared + Private = Rss above
+ # The Rss in smaps includes video card mem etc.
+
+ if pss != 0:
+ shared = int(pss - private)
+
+ return (private, shared)
+
+
+@provides("perprocess-ram")
+def psram_stat(disallowed_prefixes=None, allowed_prefixes=None):
+ results = {}
+ pid_list = get_pid_list(disallowed_prefixes, allowed_prefixes)
+ print pid_list
+ for pid in pid_list:
+ try:
+ dev_name = get_pid_name(pid)
+
+ private, shared = get_mem_stats(pid)
+ total = private + shared
+ sys_total = get_ram_size()
+ usage = float(total) / float(sys_total)
+
+ sensor_name = "{0}.{1}".format(dev_name, pid)
+
+ results[sensor_name + ".private_mem"] = SensorInfo(private, False)
+ results[sensor_name + ".shared_mem"] = SensorInfo(shared, False)
+ results[sensor_name + ".used_mem"] = SensorInfo(total, False)
+ name = sensor_name + ".mem_usage_percent"
+ results[name] = SensorInfo(usage * 100, False)
+ except IOError:
+ # permission denied or proc die
+ continue
+ return results
+
+
+def get_ram_size():
+ """ Return RAM size in Kb"""
+ with open("/proc/meminfo") as proc:
+ mem_total = proc.readline().split()
+ return mem_total[1]
diff --git a/wally/sensors/sensors/syscpu_sensors.py b/wally/sensors/sensors/syscpu_sensors.py
new file mode 100644
index 0000000..d3da02b
--- /dev/null
+++ b/wally/sensors/sensors/syscpu_sensors.py
@@ -0,0 +1,40 @@
+from ..discover import provides
+from .utils import SensorInfo, is_dev_accepted
+
+# 0 - cpu name
+# 1 - user: normal processes executing in user mode
+# 2 - nice: niced processes executing in user mode
+# 3 - system: processes executing in kernel mode
+# 4 - idle: twiddling thumbs
+# 5 - iowait: waiting for I/O to complete
+# 6 - irq: servicing interrupts
+# 7 - softirq: servicing softirqs
+
+io_values_pos = [
+ (1, 'user_processes', True),
+ (2, 'nice_processes', True),
+ (3, 'system_processes', True),
+ (4, 'idle_time', True),
+]
+
+
+@provides("system-cpu")
+def syscpu_stat(disallowed_prefixes=('intr', 'ctxt', 'btime', 'processes',
+ 'procs_running', 'procs_blocked', 'softirq'),
+ allowed_prefixes=None):
+ results = {}
+
+ for line in open('/proc/stat'):
+ vals = line.split()
+ dev_name = vals[0]
+
+ dev_ok = is_dev_accepted(dev_name,
+ disallowed_prefixes,
+ allowed_prefixes)
+
+ if dev_ok:
+ for pos, name, accum_val in io_values_pos:
+ sensor_name = "{0}.{1}".format(dev_name, name)
+ results[sensor_name] = SensorInfo(int(vals[pos]), accum_val)
+ return results
+
diff --git a/wally/sensors/sensors/sysram_sensors.py b/wally/sensors/sensors/sysram_sensors.py
new file mode 100644
index 0000000..c78eddd
--- /dev/null
+++ b/wally/sensors/sensors/sysram_sensors.py
@@ -0,0 +1,34 @@
+from ..discover import provides
+from .utils import SensorInfo, is_dev_accepted
+
+
+# return this values or setted in allowed
+ram_fields = [
+ 'MemTotal',
+ 'MemFree',
+ 'Buffers',
+ 'Cached',
+ 'SwapCached',
+ 'Dirty',
+ 'Writeback',
+ 'SwapTotal',
+ 'SwapFree'
+]
+
+
+@provides("system-ram")
+def sysram_stat(disallowed_prefixes=None, allowed_prefixes=None):
+ if allowed_prefixes is None:
+ allowed_prefixes = ram_fields
+ results = {}
+ for line in open('/proc/meminfo'):
+ vals = line.split()
+ dev_name = vals[0]
+
+ dev_ok = is_dev_accepted(dev_name,
+ disallowed_prefixes,
+ allowed_prefixes)
+
+ if dev_ok:
+ results[dev_name] = SensorInfo(int(vals[1]), False)
+ return results
diff --git a/wally/sensors/sensors/utils.py b/wally/sensors/sensors/utils.py
new file mode 100644
index 0000000..ad08676
--- /dev/null
+++ b/wally/sensors/sensors/utils.py
@@ -0,0 +1,83 @@
+import os
+
+from collections import namedtuple
+
+SensorInfo = namedtuple("SensorInfo", ['value', 'is_accumulated'])
+
+
+def is_dev_accepted(name, disallowed_prefixes, allowed_prefixes):
+ dev_ok = True
+
+ if disallowed_prefixes is not None:
+ dev_ok = all(not name.startswith(prefix)
+ for prefix in disallowed_prefixes)
+
+ if dev_ok and allowed_prefixes is not None:
+ dev_ok = any(name.startswith(prefix)
+ for prefix in allowed_prefixes)
+
+ return dev_ok
+
+
+def get_pid_list(disallowed_prefixes, allowed_prefixes):
+ """ Return pid list from list of pids and names """
+ # exceptions
+ but = disallowed_prefixes if disallowed_prefixes is not None else []
+ if allowed_prefixes is None:
+ # if nothing setted - all ps will be returned except setted
+ result = [pid
+ for pid in os.listdir('/proc')
+ if pid.isdigit() and pid not in but]
+ else:
+ result = []
+ for pid in os.listdir('/proc'):
+ if pid.isdigit() and pid not in but:
+ name = get_pid_name(pid)
+ if pid in allowed_prefixes or \
+ any(name.startswith(val) for val in allowed_prefixes):
+ print name
+ # this is allowed pid?
+ result.append(pid)
+ return result
+
+
+def get_pid_name(pid):
+ """ Return name by pid """
+ try:
+ with open(os.path.join('/proc/', pid, 'cmdline'), 'r') as pidfile:
+ try:
+ cmd = pidfile.readline().split()[0]
+ return os.path.basename(cmd).rstrip('\x00')
+ except IndexError:
+ # no cmd returned
+ return "<NO NAME>"
+ except IOError:
+ # upstream wait any string, no matter if we couldn't read proc
+ return "no_such_process"
+
+
+def delta(func, only_upd=True):
+ prev = {}
+ while True:
+ for dev_name, vals in func():
+ if dev_name not in prev:
+ prev[dev_name] = {}
+ for name, (val, _) in vals.items():
+ prev[dev_name][name] = val
+ else:
+ dev_prev = prev[dev_name]
+ res = {}
+ for stat_name, (val, accum_val) in vals.items():
+ if accum_val:
+ if stat_name in dev_prev:
+ delta = int(val) - int(dev_prev[stat_name])
+ if not only_upd or 0 != delta:
+ res[stat_name] = str(delta)
+ dev_prev[stat_name] = val
+ elif not only_upd or '0' != val:
+ res[stat_name] = val
+
+ if only_upd and len(res) == 0:
+ continue
+ yield dev_name, res
+ yield None, None
diff --git a/wally/sensors/storage/__init__.py b/wally/sensors/storage/__init__.py
new file mode 100644
index 0000000..d7bf6aa
--- /dev/null
+++ b/wally/sensors/storage/__init__.py
@@ -0,0 +1,104 @@
+import struct
+
+
+def pack(val, tp=True):
+ if isinstance(val, int):
+ assert 0 <= val < 2 ** 16
+
+ if tp:
+ res = 'i'
+ else:
+ res = ""
+
+ res += struct.pack("!U", val)
+ elif isinstance(val, dict):
+ assert len(val) < 2 ** 16
+ if tp:
+ res = "d"
+ else:
+ res = ""
+
+ res += struct.pack("!U", len(val))
+ for k, v in dict.items():
+ assert 0 <= k < 2 ** 16
+ assert 0 <= v < 2 ** 32
+ res += struct.pack("!UI", k, v)
+ elif isinstance(val, str):
+ assert len(val) < 256
+ if tp:
+ res = "s"
+ else:
+ res = ""
+ res += chr(len(val)) + val
+ else:
+ raise ValueError()
+
+ return res
+
+
+def unpack(fd, tp=None):
+ if tp is None:
+ tp = fd.read(1)
+
+ if tp == 'i':
+ return struct.unpack("!U", fd.read(2))
+ elif tp == 'd':
+ res = {}
+ val_len = struct.unpack("!U", fd.read(2))
+ for _ in range(val_len):
+ k, v = struct.unpack("!UI", fd.read(6))
+ res[k] = v
+ return res
+ elif tp == 's':
+ val_len = struct.unpack("!U", fd.read(2))
+ return fd.read(val_len)
+
+ raise ValueError()
+
+
+class LocalStorage(object):
+ NEW_DATA = 0
+ NEW_SENSOR = 1
+ NEW_SOURCE = 2
+
+ def __init__(self, fd):
+ self.fd = fd
+ self.sensor_ids = {}
+ self.sources_ids = {}
+ self.max_source_id = 0
+ self.max_sensor_id = 0
+
+ def add_data(self, source, sensor_values):
+ source_id = self.sources_ids.get(source)
+ if source_id is None:
+ source_id = self.max_source_id
+ self.sources_ids[source] = source_id
+ self.emit(self.NEW_SOURCE, source_id, source)
+ self.max_source_id += 1
+
+ new_sensor_values = {}
+
+ for name, val in sensor_values.items():
+ sensor_id = self.sensor_ids.get(name)
+ if sensor_id is None:
+ sensor_id = self.max_sensor_id
+ self.sensor_ids[name] = sensor_id
+ self.emit(self.NEW_SENSOR, sensor_id, name)
+ self.max_sensor_id += 1
+ new_sensor_values[sensor_id] = val
+
+ self.emit(self.NEW_DATA, source_id, new_sensor_values)
+
+ def emit(self, tp, v1, v2):
+ self.fd.write(chr(tp) + pack(v1, False) + pack(v2))
+
+ def readall(self):
+ tp = self.fd.read(1)
+ if ord(tp) == self.NEW_DATA:
+ pass
+ elif ord(tp) == self.NEW_SENSOR:
+ pass
+ elif ord(tp) == self.NEW_SOURCE:
+ pass
+ else:
+ raise ValueError()
diff --git a/wally/sensors/storage/grafana.py b/wally/sensors/storage/grafana.py
new file mode 100644
index 0000000..9823fac
--- /dev/null
+++ b/wally/sensors/storage/grafana.py
@@ -0,0 +1,47 @@
+import json
+
+
+query = """
+select value from "{series}"
+where $timeFilter and
+host='{host}' and device='{device}'
+order asc
+"""
+
+
+def make_dashboard_file(config):
+ series = ['writes_completed', 'sectors_written']
+ dashboards = []
+
+ for serie in series:
+ dashboard = dict(title=serie, type='graph',
+ span=12, fill=1, linewidth=2,
+ tooltip={'shared': True})
+
+ targets = []
+
+ for ip, devs in config.items():
+ for device in devs:
+ params = {
+ 'series': serie,
+ 'host': ip,
+ 'device': device
+ }
+
+ target = dict(
+ target="disk io",
+ query=query.replace("\n", " ").format(**params).strip(),
+ interval="",
+ alias="{0} io {1}".format(ip, device),
+ rawQuery=True
+ )
+ targets.append(target)
+
+ dashboard['targets'] = targets
+ dashboards.append(dashboard)
+
+ fc = open("grafana_template.js").read()
+ return fc % (json.dumps(dashboards),)
+
+
+print make_dashboard_file({'192.168.0.104': ['sda1', 'rbd1']})
diff --git a/wally/sensors/storage/grafana_template.js b/wally/sensors/storage/grafana_template.js
new file mode 100644
index 0000000..7c57924
--- /dev/null
+++ b/wally/sensors/storage/grafana_template.js
@@ -0,0 +1,46 @@
+/* global _ */
+
+/*
+ * Complex scripted dashboard
+ * This script generates a dashboard object that Grafana can load. It also takes a number of user
+ * supplied URL parameters (int ARGS variable)
+ *
+ * Return a dashboard object, or a function
+ *
+ * For async scripts, return a function, this function must take a single callback function as argument,
+ * call this callback function with the dashboard object (look at scripted_async.js for an example)
+ */
+
+
+
+// accessable variables in this scope
+var window, document, ARGS, $, jQuery, moment, kbn;
+
+// Setup some variables
+var dashboard;
+
+// All url parameters are available via the ARGS object
+var ARGS;
+
+// Intialize a skeleton with nothing but a rows array and service object
+dashboard = {rows : []};
+
+// Set a title
+dashboard.title = 'Tests dash';
+
+// Set default time
+// time can be overriden in the url using from/to parameteres, but this is
+// handled automatically in grafana core during dashboard initialization
+dashboard.time = {
+ from: "now-5m",
+ to: "now"
+};
+
+dashboard.rows.push({
+ title: 'Chart',
+ height: '300px',
+ panels: %s
+});
+
+
+return dashboard;
diff --git a/wally/sensors/storage/influx_exporter.py b/wally/sensors/storage/influx_exporter.py
new file mode 100644
index 0000000..34b3c0a
--- /dev/null
+++ b/wally/sensors/storage/influx_exporter.py
@@ -0,0 +1,31 @@
+from urlparse import urlparse
+from influxdb import InfluxDBClient
+
+
+def connect(url):
+ parsed_url = urlparse(url)
+ user_passwd, host_port = parsed_url.netloc.rsplit("@", 1)
+ user, passwd = user_passwd.split(":", 1)
+ host, port = host_port.split(":")
+ return InfluxDBClient(host, int(port), user, passwd, parsed_url.path[1:])
+
+
+def add_data(conn, hostname, data):
+ per_sensor_data = {}
+ for serie in data:
+ serie = serie.copy()
+ gtime = serie.pop('time')
+ for key, val in serie.items():
+ dev, sensor = key.split('.')
+ data = per_sensor_data.setdefault(sensor, [])
+ data.append([gtime, hostname, dev, val])
+
+ infl_data = []
+ columns = ['time', 'host', 'device', 'value']
+ for sensor_name, points in per_sensor_data.items():
+ infl_data.append(
+ {'columns': columns,
+ 'name': sensor_name,
+ 'points': points})
+
+ conn.write_points(infl_data)
diff --git a/wally/sensors/storage/koder.js b/wally/sensors/storage/koder.js
new file mode 100644
index 0000000..a65a454
--- /dev/null
+++ b/wally/sensors/storage/koder.js
@@ -0,0 +1,47 @@
+/* global _ */
+
+/*
+ * Complex scripted dashboard
+ * This script generates a dashboard object that Grafana can load. It also takes a number of user
+ * supplied URL parameters (int ARGS variable)
+ *
+ * Return a dashboard object, or a function
+ *
+ * For async scripts, return a function, this function must take a single callback function as argument,
+ * call this callback function with the dashboard object (look at scripted_async.js for an example)
+ */
+
+
+
+// accessable variables in this scope
+var window, document, ARGS, $, jQuery, moment, kbn;
+
+// Setup some variables
+var dashboard;
+
+// All url parameters are available via the ARGS object
+var ARGS;
+
+// Intialize a skeleton with nothing but a rows array and service object
+dashboard = {rows : []};
+
+// Set a title
+dashboard.title = 'Tests dash';
+
+// Set default time
+// time can be overriden in the url using from/to parameteres, but this is
+// handled automatically in grafana core during dashboard initialization
+dashboard.time = {
+ from: "now-5m",
+ to: "now"
+};
+
+dashboard.rows.push({
+ title: 'Chart',
+ height: '300px',
+ panels: [{"span": 12, "title": "writes_completed", "linewidth": 2, "type": "graph", "targets": [{"alias": "192.168.0.104 io sda1", "interval": "", "target": "disk io", "rawQuery": true, "query": "select value from \"writes_completed\" where $timeFilter and host='192.168.0.104' and device='sda1' order asc"}, {"alias": "192.168.0.104 io rbd1", "interval": "", "target": "disk io", "rawQuery": true, "query": "select value from \"writes_completed\" where $timeFilter and host='192.168.0.104' and device='rbd1' order asc"}], "tooltip": {"shared": true}, "fill": 1}, {"span": 12, "title": "sectors_written", "linewidth": 2, "type": "graph", "targets": [{"alias": "192.168.0.104 io sda1", "interval": "", "target": "disk io", "rawQuery": true, "query": "select value from \"sectors_written\" where $timeFilter and host='192.168.0.104' and device='sda1' order asc"}, {"alias": "192.168.0.104 io rbd1", "interval": "", "target": "disk io", "rawQuery": true, "query": "select value from \"sectors_written\" where $timeFilter and host='192.168.0.104' and device='rbd1' order asc"}], "tooltip": {"shared": true}, "fill": 1}]
+});
+
+
+return dashboard;
+
diff --git a/wally/sensors/umsgpack.py b/wally/sensors/umsgpack.py
new file mode 100644
index 0000000..0cdc83e
--- /dev/null
+++ b/wally/sensors/umsgpack.py
@@ -0,0 +1,879 @@
+# u-msgpack-python v2.0 - vsergeev at gmail
+# https://github.com/vsergeev/u-msgpack-python
+#
+# u-msgpack-python is a lightweight MessagePack serializer and deserializer
+# module, compatible with both Python 2 and 3, as well CPython and PyPy
+# implementations of Python. u-msgpack-python is fully compliant with the
+# latest MessagePack specification.com/msgpack/msgpack/blob/master/spec.md). In
+# particular, it supports the new binary, UTF-8 string, and application ext
+# types.
+#
+# MIT License
+#
+# Copyright (c) 2013-2014 Ivan A. Sergeev
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+"""
+u-msgpack-python v2.0 - vsergeev at gmail
+https://github.com/vsergeev/u-msgpack-python
+
+u-msgpack-python is a lightweight MessagePack serializer and deserializer
+module, compatible with both Python 2 and 3, as well CPython and PyPy
+implementations of Python. u-msgpack-python is fully compliant with the
+latest MessagePack specification.com/msgpack/msgpack/blob/master/spec.md). In
+particular, it supports the new binary, UTF-8 string, and application ext
+types.
+
+License: MIT
+"""
+
+__version__ = "2.0"
+"Module version string"
+
+version = (2,0)
+"Module version tuple"
+
+import struct
+import collections
+import sys
+import io
+
+################################################################################
+### Ext Class
+################################################################################
+
+# Extension type for application-defined types and data
+class Ext:
+ """
+ The Ext class facilitates creating a serializable extension object to store
+ an application-defined type and data byte array.
+ """
+
+ def __init__(self, type, data):
+ """
+ Construct a new Ext object.
+
+ Args:
+ type: application-defined type integer from 0 to 127
+ data: application-defined data byte array
+
+ Raises:
+ TypeError:
+ Specified ext type is outside of 0 to 127 range.
+
+ Example:
+ >>> foo = umsgpack.Ext(0x05, b"\x01\x02\x03")
+ >>> umsgpack.packb({u"special stuff": foo, u"awesome": True})
+ '\x82\xa7awesome\xc3\xadspecial stuff\xc7\x03\x05\x01\x02\x03'
+ >>> bar = umsgpack.unpackb(_)
+ >>> print(bar["special stuff"])
+ Ext Object (Type: 0x05, Data: 01 02 03)
+ >>>
+ """
+ # Application ext type should be 0 <= type <= 127
+ if not isinstance(type, int) or not (type >= 0 and type <= 127):
+ raise TypeError("ext type out of range")
+ # Check data is type bytes
+ elif sys.version_info[0] == 3 and not isinstance(data, bytes):
+ raise TypeError("ext data is not type \'bytes\'")
+ elif sys.version_info[0] == 2 and not isinstance(data, str):
+ raise TypeError("ext data is not type \'str\'")
+ self.type = type
+ self.data = data
+
+ def __eq__(self, other):
+ """
+ Compare this Ext object with another for equality.
+ """
+ return (isinstance(other, self.__class__) and
+ self.type == other.type and
+ self.data == other.data)
+
+ def __ne__(self, other):
+ """
+ Compare this Ext object with another for inequality.
+ """
+ return not self.__eq__(other)
+
+ def __str__(self):
+ """
+ String representation of this Ext object.
+ """
+ s = "Ext Object (Type: 0x%02x, Data: " % self.type
+ for i in range(min(len(self.data), 8)):
+ if i > 0:
+ s += " "
+ if isinstance(self.data[i], int):
+ s += "%02x" % (self.data[i])
+ else:
+ s += "%02x" % ord(self.data[i])
+ if len(self.data) > 8:
+ s += " ..."
+ s += ")"
+ return s
+
+################################################################################
+### Exceptions
+################################################################################
+
+# Base Exception classes
+class PackException(Exception):
+ "Base class for exceptions encountered during packing."
+ pass
+class UnpackException(Exception):
+ "Base class for exceptions encountered during unpacking."
+ pass
+
+# Packing error
+class UnsupportedTypeException(PackException):
+ "Object type not supported for packing."
+ pass
+
+# Unpacking error
+class InsufficientDataException(UnpackException):
+ "Insufficient data to unpack the encoded object."
+ pass
+class InvalidStringException(UnpackException):
+ "Invalid UTF-8 string encountered during unpacking."
+ pass
+class ReservedCodeException(UnpackException):
+ "Reserved code encountered during unpacking."
+ pass
+class UnhashableKeyException(UnpackException):
+ """
+ Unhashable key encountered during map unpacking.
+ The serialized map cannot be deserialized into a Python dictionary.
+ """
+ pass
+class DuplicateKeyException(UnpackException):
+ "Duplicate key encountered during map unpacking."
+ pass
+
+# Backwards compatibility
+KeyNotPrimitiveException = UnhashableKeyException
+KeyDuplicateException = DuplicateKeyException
+
+################################################################################
+### Exported Functions and Globals
+################################################################################
+
+# Exported functions and variables, set up in __init()
+pack = None
+packb = None
+unpack = None
+unpackb = None
+dump = None
+dumps = None
+load = None
+loads = None
+
+compatibility = False
+"""
+Compatibility mode boolean.
+
+When compatibility mode is enabled, u-msgpack-python will serialize both
+unicode strings and bytes into the old "raw" msgpack type, and deserialize the
+"raw" msgpack type into bytes. This provides backwards compatibility with the
+old MessagePack specification.
+
+Example:
+>>> umsgpack.compatibility = True
+>>>
+>>> umsgpack.packb([u"some string", b"some bytes"])
+b'\x92\xabsome string\xaasome bytes'
+>>> umsgpack.unpackb(_)
+[b'some string', b'some bytes']
+>>>
+"""
+
+################################################################################
+### Packing
+################################################################################
+
+# You may notice struct.pack("B", obj) instead of the simpler chr(obj) in the
+# code below. This is to allow for seamless Python 2 and 3 compatibility, as
+# chr(obj) has a str return type instead of bytes in Python 3, and
+# struct.pack(...) has the right return type in both versions.
+
+def _pack_integer(obj, fp):
+ if obj < 0:
+ if obj >= -32:
+ fp.write(struct.pack("b", obj))
+ elif obj >= -2**(8-1):
+ fp.write(b"\xd0" + struct.pack("b", obj))
+ elif obj >= -2**(16-1):
+ fp.write(b"\xd1" + struct.pack(">h", obj))
+ elif obj >= -2**(32-1):
+ fp.write(b"\xd2" + struct.pack(">i", obj))
+ elif obj >= -2**(64-1):
+ fp.write(b"\xd3" + struct.pack(">q", obj))
+ else:
+ raise UnsupportedTypeException("huge signed int")
+ else:
+ if obj <= 127:
+ fp.write(struct.pack("B", obj))
+ elif obj <= 2**8-1:
+ fp.write(b"\xcc" + struct.pack("B", obj))
+ elif obj <= 2**16-1:
+ fp.write(b"\xcd" + struct.pack(">H", obj))
+ elif obj <= 2**32-1:
+ fp.write(b"\xce" + struct.pack(">I", obj))
+ elif obj <= 2**64-1:
+ fp.write(b"\xcf" + struct.pack(">Q", obj))
+ else:
+ raise UnsupportedTypeException("huge unsigned int")
+
+def _pack_nil(obj, fp):
+ fp.write(b"\xc0")
+
+def _pack_boolean(obj, fp):
+ fp.write(b"\xc3" if obj else b"\xc2")
+
+def _pack_float(obj, fp):
+ if _float_size == 64:
+ fp.write(b"\xcb" + struct.pack(">d", obj))
+ else:
+ fp.write(b"\xca" + struct.pack(">f", obj))
+
+def _pack_string(obj, fp):
+ obj = obj.encode('utf-8')
+ if len(obj) <= 31:
+ fp.write(struct.pack("B", 0xa0 | len(obj)) + obj)
+ elif len(obj) <= 2**8-1:
+ fp.write(b"\xd9" + struct.pack("B", len(obj)) + obj)
+ elif len(obj) <= 2**16-1:
+ fp.write(b"\xda" + struct.pack(">H", len(obj)) + obj)
+ elif len(obj) <= 2**32-1:
+ fp.write(b"\xdb" + struct.pack(">I", len(obj)) + obj)
+ else:
+ raise UnsupportedTypeException("huge string")
+
+def _pack_binary(obj, fp):
+ if len(obj) <= 2**8-1:
+ fp.write(b"\xc4" + struct.pack("B", len(obj)) + obj)
+ elif len(obj) <= 2**16-1:
+ fp.write(b"\xc5" + struct.pack(">H", len(obj)) + obj)
+ elif len(obj) <= 2**32-1:
+ fp.write(b"\xc6" + struct.pack(">I", len(obj)) + obj)
+ else:
+ raise UnsupportedTypeException("huge binary string")
+
+def _pack_oldspec_raw(obj, fp):
+ if len(obj) <= 31:
+ fp.write(struct.pack("B", 0xa0 | len(obj)) + obj)
+ elif len(obj) <= 2**16-1:
+ fp.write(b"\xda" + struct.pack(">H", len(obj)) + obj)
+ elif len(obj) <= 2**32-1:
+ fp.write(b"\xdb" + struct.pack(">I", len(obj)) + obj)
+ else:
+ raise UnsupportedTypeException("huge raw string")
+
+def _pack_ext(obj, fp):
+ if len(obj.data) == 1:
+ fp.write(b"\xd4" + struct.pack("B", obj.type & 0xff) + obj.data)
+ elif len(obj.data) == 2:
+ fp.write(b"\xd5" + struct.pack("B", obj.type & 0xff) + obj.data)
+ elif len(obj.data) == 4:
+ fp.write(b"\xd6" + struct.pack("B", obj.type & 0xff) + obj.data)
+ elif len(obj.data) == 8:
+ fp.write(b"\xd7" + struct.pack("B", obj.type & 0xff) + obj.data)
+ elif len(obj.data) == 16:
+ fp.write(b"\xd8" + struct.pack("B", obj.type & 0xff) + obj.data)
+ elif len(obj.data) <= 2**8-1:
+ fp.write(b"\xc7" + struct.pack("BB", len(obj.data), obj.type & 0xff) + obj.data)
+ elif len(obj.data) <= 2**16-1:
+ fp.write(b"\xc8" + struct.pack(">HB", len(obj.data), obj.type & 0xff) + obj.data)
+ elif len(obj.data) <= 2**32-1:
+ fp.write(b"\xc9" + struct.pack(">IB", len(obj.data), obj.type & 0xff) + obj.data)
+ else:
+ raise UnsupportedTypeException("huge ext data")
+
+def _pack_array(obj, fp):
+ if len(obj) <= 15:
+ fp.write(struct.pack("B", 0x90 | len(obj)))
+ elif len(obj) <= 2**16-1:
+ fp.write(b"\xdc" + struct.pack(">H", len(obj)))
+ elif len(obj) <= 2**32-1:
+ fp.write(b"\xdd" + struct.pack(">I", len(obj)))
+ else:
+ raise UnsupportedTypeException("huge array")
+
+ for e in obj:
+ pack(e, fp)
+
+def _pack_map(obj, fp):
+ if len(obj) <= 15:
+ fp.write(struct.pack("B", 0x80 | len(obj)))
+ elif len(obj) <= 2**16-1:
+ fp.write(b"\xde" + struct.pack(">H", len(obj)))
+ elif len(obj) <= 2**32-1:
+ fp.write(b"\xdf" + struct.pack(">I", len(obj)))
+ else:
+ raise UnsupportedTypeException("huge array")
+
+ for k,v in obj.items():
+ pack(k, fp)
+ pack(v, fp)
+
+########################################
+
+# Pack for Python 2, with 'unicode' type, 'str' type, and 'long' type
+def _pack2(obj, fp):
+ """
+ Serialize a Python object into MessagePack bytes.
+
+ Args:
+ obj: a Python object
+ fp: a .write()-supporting file-like object
+
+ Returns:
+ None.
+
+ Raises:
+ UnsupportedType(PackException):
+ Object type not supported for packing.
+
+ Example:
+ >>> f = open('test.bin', 'w')
+ >>> umsgpack.pack({u"compact": True, u"schema": 0}, f)
+ >>>
+ """
+
+ global compatibility
+
+ if obj is None:
+ _pack_nil(obj, fp)
+ elif isinstance(obj, bool):
+ _pack_boolean(obj, fp)
+ elif isinstance(obj, int) or isinstance(obj, long):
+ _pack_integer(obj, fp)
+ elif isinstance(obj, float):
+ _pack_float(obj, fp)
+ elif compatibility and isinstance(obj, unicode):
+ _pack_oldspec_raw(bytes(obj), fp)
+ elif compatibility and isinstance(obj, bytes):
+ _pack_oldspec_raw(obj, fp)
+ elif isinstance(obj, unicode):
+ _pack_string(obj, fp)
+ elif isinstance(obj, str):
+ _pack_binary(obj, fp)
+ elif isinstance(obj, list) or isinstance(obj, tuple):
+ _pack_array(obj, fp)
+ elif isinstance(obj, dict):
+ _pack_map(obj, fp)
+ elif isinstance(obj, Ext):
+ _pack_ext(obj, fp)
+ else:
+ raise UnsupportedTypeException("unsupported type: %s" % str(type(obj)))
+
+# Pack for Python 3, with unicode 'str' type, 'bytes' type, and no 'long' type
+def _pack3(obj, fp):
+ """
+ Serialize a Python object into MessagePack bytes.
+
+ Args:
+ obj: a Python object
+ fp: a .write()-supporting file-like object
+
+ Returns:
+ None.
+
+ Raises:
+ UnsupportedType(PackException):
+ Object type not supported for packing.
+
+ Example:
+ >>> f = open('test.bin', 'w')
+ >>> umsgpack.pack({u"compact": True, u"schema": 0}, fp)
+ >>>
+ """
+ global compatibility
+
+ if obj is None:
+ _pack_nil(obj, fp)
+ elif isinstance(obj, bool):
+ _pack_boolean(obj, fp)
+ elif isinstance(obj, int):
+ _pack_integer(obj, fp)
+ elif isinstance(obj, float):
+ _pack_float(obj, fp)
+ elif compatibility and isinstance(obj, str):
+ _pack_oldspec_raw(obj.encode('utf-8'), fp)
+ elif compatibility and isinstance(obj, bytes):
+ _pack_oldspec_raw(obj, fp)
+ elif isinstance(obj, str):
+ _pack_string(obj, fp)
+ elif isinstance(obj, bytes):
+ _pack_binary(obj, fp)
+ elif isinstance(obj, list) or isinstance(obj, tuple):
+ _pack_array(obj, fp)
+ elif isinstance(obj, dict):
+ _pack_map(obj, fp)
+ elif isinstance(obj, Ext):
+ _pack_ext(obj, fp)
+ else:
+ raise UnsupportedTypeException("unsupported type: %s" % str(type(obj)))
+
+def _packb2(obj):
+ """
+ Serialize a Python object into MessagePack bytes.
+
+ Args:
+ obj: a Python object
+
+ Returns:
+ A 'str' containing serialized MessagePack bytes.
+
+ Raises:
+ UnsupportedType(PackException):
+ Object type not supported for packing.
+
+ Example:
+ >>> umsgpack.packb({u"compact": True, u"schema": 0})
+ '\x82\xa7compact\xc3\xa6schema\x00'
+ >>>
+ """
+ fp = io.BytesIO()
+ _pack2(obj, fp)
+ return fp.getvalue()
+
+def _packb3(obj):
+ """
+ Serialize a Python object into MessagePack bytes.
+
+ Args:
+ obj: a Python object
+
+ Returns:
+ A 'bytes' containing serialized MessagePack bytes.
+
+ Raises:
+ UnsupportedType(PackException):
+ Object type not supported for packing.
+
+ Example:
+ >>> umsgpack.packb({u"compact": True, u"schema": 0})
+ b'\x82\xa7compact\xc3\xa6schema\x00'
+ >>>
+ """
+ fp = io.BytesIO()
+ _pack3(obj, fp)
+ return fp.getvalue()
+
+################################################################################
+### Unpacking
+################################################################################
+
+def _read_except(fp, n):
+ data = fp.read(n)
+ if len(data) < n:
+ raise InsufficientDataException()
+ return data
+
+def _unpack_integer(code, fp):
+ if (ord(code) & 0xe0) == 0xe0:
+ return struct.unpack("b", code)[0]
+ elif code == b'\xd0':
+ return struct.unpack("b", _read_except(fp, 1))[0]
+ elif code == b'\xd1':
+ return struct.unpack(">h", _read_except(fp, 2))[0]
+ elif code == b'\xd2':
+ return struct.unpack(">i", _read_except(fp, 4))[0]
+ elif code == b'\xd3':
+ return struct.unpack(">q", _read_except(fp, 8))[0]
+ elif (ord(code) & 0x80) == 0x00:
+ return struct.unpack("B", code)[0]
+ elif code == b'\xcc':
+ return struct.unpack("B", _read_except(fp, 1))[0]
+ elif code == b'\xcd':
+ return struct.unpack(">H", _read_except(fp, 2))[0]
+ elif code == b'\xce':
+ return struct.unpack(">I", _read_except(fp, 4))[0]
+ elif code == b'\xcf':
+ return struct.unpack(">Q", _read_except(fp, 8))[0]
+ raise Exception("logic error, not int: 0x%02x" % ord(code))
+
+def _unpack_reserved(code, fp):
+ if code == b'\xc1':
+ raise ReservedCodeException("encountered reserved code: 0x%02x" % ord(code))
+ raise Exception("logic error, not reserved code: 0x%02x" % ord(code))
+
+def _unpack_nil(code, fp):
+ if code == b'\xc0':
+ return None
+ raise Exception("logic error, not nil: 0x%02x" % ord(code))
+
+def _unpack_boolean(code, fp):
+ if code == b'\xc2':
+ return False
+ elif code == b'\xc3':
+ return True
+ raise Exception("logic error, not boolean: 0x%02x" % ord(code))
+
+def _unpack_float(code, fp):
+ if code == b'\xca':
+ return struct.unpack(">f", _read_except(fp, 4))[0]
+ elif code == b'\xcb':
+ return struct.unpack(">d", _read_except(fp, 8))[0]
+ raise Exception("logic error, not float: 0x%02x" % ord(code))
+
+def _unpack_string(code, fp):
+ if (ord(code) & 0xe0) == 0xa0:
+ length = ord(code) & ~0xe0
+ elif code == b'\xd9':
+ length = struct.unpack("B", _read_except(fp, 1))[0]
+ elif code == b'\xda':
+ length = struct.unpack(">H", _read_except(fp, 2))[0]
+ elif code == b'\xdb':
+ length = struct.unpack(">I", _read_except(fp, 4))[0]
+ else:
+ raise Exception("logic error, not string: 0x%02x" % ord(code))
+
+ # Always return raw bytes in compatibility mode
+ global compatibility
+ if compatibility:
+ return _read_except(fp, length)
+
+ try:
+ return bytes.decode(_read_except(fp, length), 'utf-8')
+ except UnicodeDecodeError:
+ raise InvalidStringException("unpacked string is not utf-8")
+
+def _unpack_binary(code, fp):
+ if code == b'\xc4':
+ length = struct.unpack("B", _read_except(fp, 1))[0]
+ elif code == b'\xc5':
+ length = struct.unpack(">H", _read_except(fp, 2))[0]
+ elif code == b'\xc6':
+ length = struct.unpack(">I", _read_except(fp, 4))[0]
+ else:
+ raise Exception("logic error, not binary: 0x%02x" % ord(code))
+
+ return _read_except(fp, length)
+
+def _unpack_ext(code, fp):
+ if code == b'\xd4':
+ length = 1
+ elif code == b'\xd5':
+ length = 2
+ elif code == b'\xd6':
+ length = 4
+ elif code == b'\xd7':
+ length = 8
+ elif code == b'\xd8':
+ length = 16
+ elif code == b'\xc7':
+ length = struct.unpack("B", _read_except(fp, 1))[0]
+ elif code == b'\xc8':
+ length = struct.unpack(">H", _read_except(fp, 2))[0]
+ elif code == b'\xc9':
+ length = struct.unpack(">I", _read_except(fp, 4))[0]
+ else:
+ raise Exception("logic error, not ext: 0x%02x" % ord(code))
+
+ return Ext(ord(_read_except(fp, 1)), _read_except(fp, length))
+
+def _unpack_array(code, fp):
+ if (ord(code) & 0xf0) == 0x90:
+ length = (ord(code) & ~0xf0)
+ elif code == b'\xdc':
+ length = struct.unpack(">H", _read_except(fp, 2))[0]
+ elif code == b'\xdd':
+ length = struct.unpack(">I", _read_except(fp, 4))[0]
+ else:
+ raise Exception("logic error, not array: 0x%02x" % ord(code))
+
+ return [_unpack(fp) for i in range(length)]
+
+def _deep_list_to_tuple(obj):
+ if isinstance(obj, list):
+ return tuple([_deep_list_to_tuple(e) for e in obj])
+ return obj
+
+def _unpack_map(code, fp):
+ if (ord(code) & 0xf0) == 0x80:
+ length = (ord(code) & ~0xf0)
+ elif code == b'\xde':
+ length = struct.unpack(">H", _read_except(fp, 2))[0]
+ elif code == b'\xdf':
+ length = struct.unpack(">I", _read_except(fp, 4))[0]
+ else:
+ raise Exception("logic error, not map: 0x%02x" % ord(code))
+
+ d = {}
+ for i in range(length):
+ # Unpack key
+ k = _unpack(fp)
+
+ if isinstance(k, list):
+ # Attempt to convert list into a hashable tuple
+ k = _deep_list_to_tuple(k)
+ elif not isinstance(k, collections.Hashable):
+ raise UnhashableKeyException("encountered unhashable key: %s, %s" % (str(k), str(type(k))))
+ elif k in d:
+ raise DuplicateKeyException("encountered duplicate key: %s, %s" % (str(k), str(type(k))))
+
+ # Unpack value
+ v = _unpack(fp)
+
+ try:
+ d[k] = v
+ except TypeError:
+ raise UnhashableKeyException("encountered unhashable key: %s" % str(k))
+ return d
+
+def _unpack(fp):
+ code = _read_except(fp, 1)
+ return _unpack_dispatch_table[code](code, fp)
+
+########################################
+
+def _unpack2(fp):
+ """
+ Deserialize MessagePack bytes into a Python object.
+
+ Args:
+ fp: a .read()-supporting file-like object
+
+ Returns:
+ A Python object.
+
+ Raises:
+ InsufficientDataException(UnpackException):
+ Insufficient data to unpack the encoded object.
+ InvalidStringException(UnpackException):
+ Invalid UTF-8 string encountered during unpacking.
+ ReservedCodeException(UnpackException):
+ Reserved code encountered during unpacking.
+ UnhashableKeyException(UnpackException):
+ Unhashable key encountered during map unpacking.
+ The serialized map cannot be deserialized into a Python dictionary.
+ DuplicateKeyException(UnpackException):
+ Duplicate key encountered during map unpacking.
+
+ Example:
+ >>> f = open("test.bin")
+ >>> umsgpack.unpackb(f)
+ {u'compact': True, u'schema': 0}
+ >>>
+ """
+ return _unpack(fp)
+
+def _unpack3(fp):
+ """
+ Deserialize MessagePack bytes into a Python object.
+
+ Args:
+ fp: a .read()-supporting file-like object
+
+ Returns:
+ A Python object.
+
+ Raises:
+ InsufficientDataException(UnpackException):
+ Insufficient data to unpack the encoded object.
+ InvalidStringException(UnpackException):
+ Invalid UTF-8 string encountered during unpacking.
+ ReservedCodeException(UnpackException):
+ Reserved code encountered during unpacking.
+ UnhashableKeyException(UnpackException):
+ Unhashable key encountered during map unpacking.
+ The serialized map cannot be deserialized into a Python dictionary.
+ DuplicateKeyException(UnpackException):
+ Duplicate key encountered during map unpacking.
+
+ Example:
+ >>> f = open("test.bin")
+ >>> umsgpack.unpackb(f)
+ {'compact': True, 'schema': 0}
+ >>>
+ """
+ return _unpack(fp)
+
+# For Python 2, expects a str object
+def _unpackb2(s):
+ """
+ Deserialize MessagePack bytes into a Python object.
+
+ Args:
+ s: a 'str' containing serialized MessagePack bytes
+
+ Returns:
+ A Python object.
+
+ Raises:
+ TypeError:
+ Packed data is not type 'str'.
+ InsufficientDataException(UnpackException):
+ Insufficient data to unpack the encoded object.
+ InvalidStringException(UnpackException):
+ Invalid UTF-8 string encountered during unpacking.
+ ReservedCodeException(UnpackException):
+ Reserved code encountered during unpacking.
+ UnhashableKeyException(UnpackException):
+ Unhashable key encountered during map unpacking.
+ The serialized map cannot be deserialized into a Python dictionary.
+ DuplicateKeyException(UnpackException):
+ Duplicate key encountered during map unpacking.
+
+ Example:
+ >>> umsgpack.unpackb(b'\x82\xa7compact\xc3\xa6schema\x00')
+ {u'compact': True, u'schema': 0}
+ >>>
+ """
+ if not isinstance(s, str):
+ raise TypeError("packed data is not type 'str'")
+ return _unpack(io.BytesIO(s))
+
+# For Python 3, expects a bytes object
+def _unpackb3(s):
+ """
+ Deserialize MessagePack bytes into a Python object.
+
+ Args:
+ s: a 'bytes' containing serialized MessagePack bytes
+
+ Returns:
+ A Python object.
+
+ Raises:
+ TypeError:
+ Packed data is not type 'bytes'.
+ InsufficientDataException(UnpackException):
+ Insufficient data to unpack the encoded object.
+ InvalidStringException(UnpackException):
+ Invalid UTF-8 string encountered during unpacking.
+ ReservedCodeException(UnpackException):
+ Reserved code encountered during unpacking.
+ UnhashableKeyException(UnpackException):
+ Unhashable key encountered during map unpacking.
+ The serialized map cannot be deserialized into a Python dictionary.
+ DuplicateKeyException(UnpackException):
+ Duplicate key encountered during map unpacking.
+
+ Example:
+ >>> umsgpack.unpackb(b'\x82\xa7compact\xc3\xa6schema\x00')
+ {'compact': True, 'schema': 0}
+ >>>
+ """
+ if not isinstance(s, bytes):
+ raise TypeError("packed data is not type 'bytes'")
+ return _unpack(io.BytesIO(s))
+
+################################################################################
+### Module Initialization
+################################################################################
+
+def __init():
+ global pack
+ global packb
+ global unpack
+ global unpackb
+ global dump
+ global dumps
+ global load
+ global loads
+ global compatibility
+ global _float_size
+ global _unpack_dispatch_table
+
+ # Compatibility mode for handling strings/bytes with the old specification
+ compatibility = False
+
+ # Auto-detect system float precision
+ if sys.float_info.mant_dig == 53:
+ _float_size = 64
+ else:
+ _float_size = 32
+
+ # Map packb and unpackb to the appropriate version
+ if sys.version_info[0] == 3:
+ pack = _pack3
+ packb = _packb3
+ dump = _pack3
+ dumps = _packb3
+ unpack = _unpack3
+ unpackb = _unpackb3
+ load = _unpack3
+ loads = _unpackb3
+ else:
+ pack = _pack2
+ packb = _packb2
+ dump = _pack2
+ dumps = _packb2
+ unpack = _unpack2
+ unpackb = _unpackb2
+ load = _unpack2
+ loads = _unpackb2
+
+ # Build a dispatch table for fast lookup of unpacking function
+
+ _unpack_dispatch_table = {}
+ # Fix uint
+ for code in range(0, 0x7f+1):
+ _unpack_dispatch_table[struct.pack("B", code)] = _unpack_integer
+ # Fix map
+ for code in range(0x80, 0x8f+1):
+ _unpack_dispatch_table[struct.pack("B", code)] = _unpack_map
+ # Fix array
+ for code in range(0x90, 0x9f+1):
+ _unpack_dispatch_table[struct.pack("B", code)] = _unpack_array
+ # Fix str
+ for code in range(0xa0, 0xbf+1):
+ _unpack_dispatch_table[struct.pack("B", code)] = _unpack_string
+ # Nil
+ _unpack_dispatch_table[b'\xc0'] = _unpack_nil
+ # Reserved
+ _unpack_dispatch_table[b'\xc1'] = _unpack_reserved
+ # Boolean
+ _unpack_dispatch_table[b'\xc2'] = _unpack_boolean
+ _unpack_dispatch_table[b'\xc3'] = _unpack_boolean
+ # Bin
+ for code in range(0xc4, 0xc6+1):
+ _unpack_dispatch_table[struct.pack("B", code)] = _unpack_binary
+ # Ext
+ for code in range(0xc7, 0xc9+1):
+ _unpack_dispatch_table[struct.pack("B", code)] = _unpack_ext
+ # Float
+ _unpack_dispatch_table[b'\xca'] = _unpack_float
+ _unpack_dispatch_table[b'\xcb'] = _unpack_float
+ # Uint
+ for code in range(0xcc, 0xcf+1):
+ _unpack_dispatch_table[struct.pack("B", code)] = _unpack_integer
+ # Int
+ for code in range(0xd0, 0xd3+1):
+ _unpack_dispatch_table[struct.pack("B", code)] = _unpack_integer
+ # Fixext
+ for code in range(0xd4, 0xd8+1):
+ _unpack_dispatch_table[struct.pack("B", code)] = _unpack_ext
+ # String
+ for code in range(0xd9, 0xdb+1):
+ _unpack_dispatch_table[struct.pack("B", code)] = _unpack_string
+ # Array
+ _unpack_dispatch_table[b'\xdc'] = _unpack_array
+ _unpack_dispatch_table[b'\xdd'] = _unpack_array
+ # Map
+ _unpack_dispatch_table[b'\xde'] = _unpack_map
+ _unpack_dispatch_table[b'\xdf'] = _unpack_map
+ # Negative fixint
+ for code in range(0xe0, 0xff+1):
+ _unpack_dispatch_table[struct.pack("B", code)] = _unpack_integer
+
+__init()
diff --git a/wally/ssh_utils.py b/wally/ssh_utils.py
new file mode 100644
index 0000000..68d4017
--- /dev/null
+++ b/wally/ssh_utils.py
@@ -0,0 +1,306 @@
+import re
+import time
+import socket
+import logging
+import os.path
+import getpass
+import threading
+
+
+import paramiko
+
+
+logger = logging.getLogger("wally")
+
+
+def ssh_connect(creds, retry_count=6, timeout=10):
+ ssh = paramiko.SSHClient()
+ ssh.load_host_keys('/dev/null')
+ ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ ssh.known_hosts = None
+ for i in range(retry_count):
+ try:
+ if creds.user is None:
+ user = getpass.getuser()
+ else:
+ user = creds.user
+
+ if creds.passwd is not None:
+ ssh.connect(creds.host,
+ timeout=timeout, # tcp connect timeout
+ username=user,
+ password=creds.passwd,
+ port=creds.port,
+ allow_agent=False,
+ look_for_keys=False)
+ return ssh
+
+ if creds.key_file is not None:
+ ssh.connect(creds.host,
+ username=user,
+ timeout=timeout, # tcp connect timeout
+ key_filename=creds.key_file,
+ look_for_keys=False,
+ port=creds.port)
+ return ssh
+
+ key_file = os.path.expanduser('~/.ssh/id_rsa')
+ ssh.connect(creds.host,
+ username=user,
+ timeout=timeout, # tcp connect timeout
+ key_filename=key_file,
+ look_for_keys=False,
+ port=creds.port)
+ return ssh
+ # raise ValueError("Wrong credentials {0}".format(creds.__dict__))
+ except paramiko.PasswordRequiredException:
+ raise
+ except socket.error as err:
+ print err
+ if i == retry_count - 1:
+ raise
+ time.sleep(1)
+
+
+def normalize_dirpath(dirpath):
+ while dirpath.endswith("/"):
+ dirpath = dirpath[:-1]
+ return dirpath
+
+
+ALL_RWX_MODE = ((1 << 9) - 1)
+
+
+def ssh_mkdir(sftp, remotepath, mode=ALL_RWX_MODE, intermediate=False):
+ remotepath = normalize_dirpath(remotepath)
+ if intermediate:
+ try:
+ sftp.mkdir(remotepath, mode=mode)
+ except IOError:
+ ssh_mkdir(sftp, remotepath.rsplit("/", 1)[0], mode=mode,
+ intermediate=True)
+ return sftp.mkdir(remotepath, mode=mode)
+ else:
+ sftp.mkdir(remotepath, mode=mode)
+
+
+def ssh_copy_file(sftp, localfile, remfile, preserve_perm=True):
+ sftp.put(localfile, remfile)
+ if preserve_perm:
+ sftp.chmod(remfile, os.stat(localfile).st_mode & ALL_RWX_MODE)
+
+
+def put_dir_recursively(sftp, localpath, remotepath, preserve_perm=True):
+ "upload local directory to remote recursively"
+
+ # hack for localhost connection
+ if hasattr(sftp, "copytree"):
+ sftp.copytree(localpath, remotepath)
+ return
+
+ assert remotepath.startswith("/"), "%s must be absolute path" % remotepath
+
+ # normalize
+ localpath = normalize_dirpath(localpath)
+ remotepath = normalize_dirpath(remotepath)
+
+ try:
+ sftp.chdir(remotepath)
+ localsuffix = localpath.rsplit("/", 1)[1]
+ remotesuffix = remotepath.rsplit("/", 1)[1]
+ if localsuffix != remotesuffix:
+ remotepath = os.path.join(remotepath, localsuffix)
+ except IOError:
+ pass
+
+ for root, dirs, fls in os.walk(localpath):
+ prefix = os.path.commonprefix([localpath, root])
+ suffix = root.split(prefix, 1)[1]
+ if suffix.startswith("/"):
+ suffix = suffix[1:]
+
+ remroot = os.path.join(remotepath, suffix)
+
+ try:
+ sftp.chdir(remroot)
+ except IOError:
+ if preserve_perm:
+ mode = os.stat(root).st_mode & ALL_RWX_MODE
+ else:
+ mode = ALL_RWX_MODE
+ ssh_mkdir(sftp, remroot, mode=mode, intermediate=True)
+ sftp.chdir(remroot)
+
+ for f in fls:
+ remfile = os.path.join(remroot, f)
+ localfile = os.path.join(root, f)
+ ssh_copy_file(sftp, localfile, remfile, preserve_perm)
+
+
+def delete_file(conn, path):
+ sftp = conn.open_sftp()
+ sftp.remove(path)
+ sftp.close()
+
+
+def copy_paths(conn, paths):
+ sftp = conn.open_sftp()
+ try:
+ for src, dst in paths.items():
+ try:
+ if os.path.isfile(src):
+ ssh_copy_file(sftp, src, dst)
+ elif os.path.isdir(src):
+ put_dir_recursively(sftp, src, dst)
+ else:
+ templ = "Can't copy {0!r} - " + \
+ "it neither a file not a directory"
+ msg = templ.format(src)
+ raise OSError(msg)
+ except Exception as exc:
+ tmpl = "Scp {0!r} => {1!r} failed - {2!r}"
+ msg = tmpl.format(src, dst, exc)
+ raise OSError(msg)
+ finally:
+ sftp.close()
+
+
+class ConnCreds(object):
+ conn_uri_attrs = ("user", "passwd", "host", "port", "path")
+
+ def __init__(self):
+ for name in self.conn_uri_attrs:
+ setattr(self, name, None)
+
+ def __str__(self):
+ return str(self.__dict__)
+
+
+uri_reg_exprs = []
+
+
+class URIsNamespace(object):
+ class ReParts(object):
+ user_rr = "[^:]*?"
+ host_rr = "[^:]*?"
+ port_rr = "\\d+"
+ key_file_rr = "[^:@]*"
+ passwd_rr = ".*?"
+
+ re_dct = ReParts.__dict__
+
+ for attr_name, val in re_dct.items():
+ if attr_name.endswith('_rr'):
+ new_rr = "(?P<{0}>{1})".format(attr_name[:-3], val)
+ setattr(ReParts, attr_name, new_rr)
+
+ re_dct = ReParts.__dict__
+
+ templs = [
+ "^{host_rr}$",
+ "^{user_rr}@{host_rr}::{key_file_rr}$",
+ "^{user_rr}@{host_rr}:{port_rr}:{key_file_rr}$",
+ "^{user_rr}:{passwd_rr}@@{host_rr}$",
+ "^{user_rr}:{passwd_rr}@@{host_rr}:{port_rr}$",
+ ]
+
+ for templ in templs:
+ uri_reg_exprs.append(templ.format(**re_dct))
+
+
+def parse_ssh_uri(uri):
+ # user:passwd@@ip_host:port
+ # user:passwd@@ip_host
+ # user@ip_host:port
+ # user@ip_host
+ # ip_host:port
+ # ip_host
+ # user@ip_host:port:path_to_key_file
+ # user@ip_host::path_to_key_file
+ # ip_host:port:path_to_key_file
+ # ip_host::path_to_key_file
+
+ res = ConnCreds()
+ res.port = "22"
+ res.key_file = None
+ res.passwd = None
+
+ for rr in uri_reg_exprs:
+ rrm = re.match(rr, uri)
+ if rrm is not None:
+ res.__dict__.update(rrm.groupdict())
+ return res
+
+ raise ValueError("Can't parse {0!r} as ssh uri value".format(uri))
+
+
+def connect(uri):
+ creds = parse_ssh_uri(uri)
+ creds.port = int(creds.port)
+ return ssh_connect(creds)
+
+
+all_sessions_lock = threading.Lock()
+all_sessions = []
+
+
+def run_over_ssh(conn, cmd, stdin_data=None, timeout=60,
+ nolog=False, node=None):
+ "should be replaces by normal implementation, with select"
+ transport = conn.get_transport()
+ session = transport.open_session()
+
+ if node is None:
+ node = ""
+
+ with all_sessions_lock:
+ all_sessions.append(session)
+
+ try:
+ session.set_combine_stderr(True)
+
+ stime = time.time()
+
+ if not nolog:
+ logger.debug("SSH:{0} Exec {1!r}".format(node, cmd))
+
+ session.exec_command(cmd)
+
+ if stdin_data is not None:
+ session.sendall(stdin_data)
+
+ session.settimeout(1)
+ session.shutdown_write()
+ output = ""
+
+ while True:
+ try:
+ ndata = session.recv(1024)
+ output += ndata
+ if "" == ndata:
+ break
+ except socket.timeout:
+ pass
+
+ if time.time() - stime > timeout:
+ raise OSError(output + "\nExecution timeout")
+
+ code = session.recv_exit_status()
+ finally:
+ session.close()
+
+ if code != 0:
+ templ = "SSH:{0} Cmd {1!r} failed with code {2}. Output: {3}"
+ raise OSError(templ.format(node, cmd, code, output))
+
+ return output
+
+
+def close_all_sessions():
+ with all_sessions_lock:
+ for session in all_sessions:
+ try:
+ session.sendall('\x03')
+ session.close()
+ except:
+ pass
diff --git a/wally/start_vms.py b/wally/start_vms.py
new file mode 100644
index 0000000..4fd35ae
--- /dev/null
+++ b/wally/start_vms.py
@@ -0,0 +1,372 @@
+import re
+import os
+import time
+import os.path
+import logging
+import subprocess
+
+from concurrent.futures import ThreadPoolExecutor
+
+from novaclient.exceptions import NotFound
+from novaclient.client import Client as n_client
+from cinderclient.v1.client import Client as c_client
+
+import wally
+from wally.discover import Node
+
+
+logger = logging.getLogger("wally.vms")
+
+
+def ostack_get_creds():
+ env = os.environ.get
+ name = env('OS_USERNAME')
+ passwd = env('OS_PASSWORD')
+ tenant = env('OS_TENANT_NAME')
+ auth_url = env('OS_AUTH_URL')
+
+ return name, passwd, tenant, auth_url
+
+
+NOVA_CONNECTION = None
+
+
+def nova_connect(name=None, passwd=None, tenant=None, auth_url=None):
+ global NOVA_CONNECTION
+ if NOVA_CONNECTION is None:
+ if name is None:
+ name, passwd, tenant, auth_url = ostack_get_creds()
+ NOVA_CONNECTION = n_client('1.1', name, passwd, tenant, auth_url)
+ return NOVA_CONNECTION
+
+
+def nova_disconnect():
+ global NOVA_CONNECTION
+ if NOVA_CONNECTION is not None:
+ NOVA_CONNECTION.close()
+ NOVA_CONNECTION = None
+
+
+def prepare_os_subpr(name=None, passwd=None, tenant=None, auth_url=None):
+ if name is None:
+ name, passwd, tenant, auth_url = ostack_get_creds()
+
+ params = {
+ 'OS_USERNAME': name,
+ 'OS_PASSWORD': passwd,
+ 'OS_TENANT_NAME': tenant,
+ 'OS_AUTH_URL': auth_url
+ }
+
+ params_s = " ".join("{}={}".format(k, v) for k, v in params.items())
+
+ spath = os.path.dirname(wally.__file__)
+ spath = os.path.dirname(spath)
+ spath = os.path.join(spath, 'scripts/prepare.sh')
+
+ cmd_templ = "env {params} bash {spath} >/dev/null"
+ cmd = cmd_templ.format(params=params_s, spath=spath)
+ subprocess.call(cmd, shell=True)
+
+
+def prepare_os(nova, params):
+ allow_ssh(nova, params['security_group'])
+
+ shed_ids = []
+ for shed_group in params['schedulers_groups']:
+ shed_ids.append(get_or_create_aa_group(nova, shed_group))
+
+ create_keypair(nova,
+ params['keypair_name'],
+ params['pub_key_path'],
+ params['priv_key_path'])
+
+ create_image(nova, params['image']['name'],
+ params['image']['url'])
+
+ create_flavor(nova, **params['flavor'])
+
+
+def get_or_create_aa_group(nova, name):
+ try:
+ group = nova.server_groups.find(name=name)
+ except NotFound:
+ group = nova.server_groups.create({'name': name,
+ 'policies': ['anti-affinity']})
+
+ return group.id
+
+
+def allow_ssh(nova, group_name):
+ try:
+ secgroup = nova.security_groups.find(name=group_name)
+ except NotFound:
+ secgroup = nova.security_groups.create(group_name,
+ "allow ssh/ping to node")
+
+ nova.security_group_rules.create(secgroup.id,
+ ip_protocol="tcp",
+ from_port="22",
+ to_port="22",
+ cidr="0.0.0.0/0")
+
+ nova.security_group_rules.create(secgroup.id,
+ ip_protocol="icmp",
+ from_port=-1,
+ cidr="0.0.0.0/0",
+ to_port=-1)
+ return secgroup.id
+
+
+def create_image(nova, name, url):
+ pass
+
+
+def create_flavor(nova, name, **params):
+ pass
+
+
+def create_keypair(nova, name, pub_key_path, priv_key_path):
+ try:
+ nova.keypairs.find(name=name)
+ except NotFound:
+ if os.path.exists(pub_key_path):
+ with open(pub_key_path) as pub_key_fd:
+ return nova.keypairs.create(name, pub_key_fd.read())
+ else:
+ key = nova.keypairs.create(name)
+
+ with open(priv_key_path, "w") as priv_key_fd:
+ priv_key_fd.write(key.private_key)
+
+ with open(pub_key_path, "w") as pub_key_fd:
+ pub_key_fd.write(key.public_key)
+
+
+def create_volume(size, name):
+ cinder = c_client(*ostack_get_creds())
+ vol = cinder.volumes.create(size=size, display_name=name)
+ err_count = 0
+
+ while vol['status'] != 'available':
+ if vol['status'] == 'error':
+ if err_count == 3:
+ logger.critical("Fail to create volume")
+ raise RuntimeError("Fail to create volume")
+ else:
+ err_count += 1
+ cinder.volumes.delete(vol)
+ time.sleep(1)
+ vol = cinder.volumes.create(size=size, display_name=name)
+ continue
+ time.sleep(1)
+ vol = cinder.volumes.get(vol['id'])
+ return vol
+
+
+def wait_for_server_active(nova, server, timeout=240):
+ t = time.time()
+ while True:
+ time.sleep(1)
+ sstate = getattr(server, 'OS-EXT-STS:vm_state').lower()
+
+ if sstate == 'active':
+ return True
+
+ if sstate == 'error':
+ return False
+
+ if time.time() - t > timeout:
+ return False
+
+ server = nova.servers.get(server)
+
+
+class Allocate(object):
+ pass
+
+
+def get_floating_ips(nova, pool, amount):
+ ip_list = nova.floating_ips.list()
+
+ if pool is not None:
+ ip_list = [ip for ip in ip_list if ip.pool == pool]
+
+ return [ip for ip in ip_list if ip.instance_id is None][:amount]
+
+
+def launch_vms(params):
+ logger.debug("Starting new nodes on openstack")
+ params = params.copy()
+ count = params.pop('count')
+
+ if isinstance(count, basestring):
+ assert count.startswith("x")
+ lst = NOVA_CONNECTION.services.list(binary='nova-compute')
+ srv_count = len([srv for srv in lst if srv.status == 'enabled'])
+ count = srv_count * int(count[1:])
+
+ srv_params = "img: {image[name]}, flavor: {flavor[name]}".format(**params)
+ msg_templ = "Will start {0} servers with next params: {1}"
+ logger.info(msg_templ.format(count, srv_params))
+ vm_creds = params.pop('creds')
+
+ params = params.copy()
+
+ params['img_name'] = params['image']['name']
+ params['flavor_name'] = params['flavor']['name']
+
+ del params['image']
+ del params['flavor']
+ del params['scheduler_group_name']
+ private_key_path = params.pop('private_key_path')
+
+ for ip, os_node in create_vms_mt(NOVA_CONNECTION, count, **params):
+ conn_uri = vm_creds.format(ip=ip, private_key_path=private_key_path)
+ yield Node(conn_uri, []), os_node.id
+
+
+def create_vms_mt(nova, amount, group_name, keypair_name, img_name,
+ flavor_name, vol_sz=None, network_zone_name=None,
+ flt_ip_pool=None, name_templ='wally-{id}',
+ scheduler_hints=None, security_group=None):
+
+ with ThreadPoolExecutor(max_workers=16) as executor:
+ if network_zone_name is not None:
+ network_future = executor.submit(nova.networks.find,
+ label=network_zone_name)
+ else:
+ network_future = None
+
+ fl_future = executor.submit(nova.flavors.find, name=flavor_name)
+ img_future = executor.submit(nova.images.find, name=img_name)
+
+ if flt_ip_pool is not None:
+ ips_future = executor.submit(get_floating_ips,
+ nova, flt_ip_pool, amount)
+ logger.debug("Wait for floating ip")
+ ips = ips_future.result()
+ ips += [Allocate] * (amount - len(ips))
+ else:
+ ips = [None] * amount
+
+ logger.debug("Getting flavor object")
+ fl = fl_future.result()
+ logger.debug("Getting image object")
+ img = img_future.result()
+
+ if network_future is not None:
+ logger.debug("Waiting for network results")
+ nics = [{'net-id': network_future.result().id}]
+ else:
+ nics = None
+
+ names = []
+ for i in range(amount):
+ names.append(name_templ.format(group=group_name, id=i))
+
+ futures = []
+ logger.debug("Requesting new vms")
+
+ for name, flt_ip in zip(names, ips):
+ params = (nova, name, keypair_name, img, fl,
+ nics, vol_sz, flt_ip, scheduler_hints,
+ flt_ip_pool, [security_group])
+
+ futures.append(executor.submit(create_vm, *params))
+ res = [future.result() for future in futures]
+ logger.debug("Done spawning")
+ return res
+
+
+def create_vm(nova, name, keypair_name, img,
+ fl, nics, vol_sz=None,
+ flt_ip=False,
+ scheduler_hints=None,
+ pool=None,
+ security_groups=None):
+ for i in range(3):
+ srv = nova.servers.create(name,
+ flavor=fl,
+ image=img,
+ nics=nics,
+ key_name=keypair_name,
+ scheduler_hints=scheduler_hints,
+ security_groups=security_groups)
+
+ if not wait_for_server_active(nova, srv):
+ msg = "Server {0} fails to start. Kill it and try again"
+ logger.debug(msg.format(srv))
+ nova.servers.delete(srv)
+
+ for j in range(120):
+ # print "wait till server deleted"
+ all_id = set(alive_srv.id for alive_srv in nova.servers.list())
+ if srv.id not in all_id:
+ break
+ time.sleep(1)
+ else:
+ raise RuntimeError("Server {0} delete timeout".format(srv.id))
+ else:
+ break
+ else:
+ raise RuntimeError("Failed to start server".format(srv.id))
+
+ if vol_sz is not None:
+ # print "creating volume"
+ vol = create_volume(vol_sz, name)
+ # print "attach volume to server"
+ nova.volumes.create_server_volume(srv.id, vol['id'], None)
+
+ if flt_ip is Allocate:
+ flt_ip = nova.floating_ips.create(pool)
+
+ if flt_ip is not None:
+ # print "attaching ip to server"
+ srv.add_floating_ip(flt_ip)
+
+ return flt_ip.ip, nova.servers.get(srv.id)
+
+
+def clear_nodes(nodes_ids):
+ clear_all(NOVA_CONNECTION, nodes_ids, None)
+
+
+def clear_all(nova, ids=None, name_templ="ceph-test-{0}"):
+
+ def need_delete(srv):
+ if name_templ is not None:
+ return re.match(name_templ.format("\\d+"), srv.name) is not None
+ else:
+ return srv.id in ids
+
+ deleted_srvs = set()
+ for srv in nova.servers.list():
+ if need_delete(srv):
+ logger.debug("Deleting server {0}".format(srv.name))
+ nova.servers.delete(srv)
+ deleted_srvs.add(srv.id)
+
+ count = 0
+ while True:
+ if count % 60 == 0:
+ logger.debug("Waiting till all servers are actually deleted")
+ all_id = set(srv.id for srv in nova.servers.list())
+ if len(all_id.intersection(deleted_srvs)) == 0:
+ break
+ count += 1
+ time.sleep(1)
+ logger.debug("Done, deleting volumes")
+
+ # wait till vm actually deleted
+
+ if name_templ is not None:
+ cinder = c_client(*ostack_get_creds())
+ for vol in cinder.volumes.list():
+ if isinstance(vol.display_name, basestring):
+ if re.match(name_templ.format("\\d+"), vol.display_name):
+ if vol.status in ('available', 'error'):
+ logger.debug("Deleting volume " + vol.display_name)
+ cinder.volumes.delete(vol)
+
+ logger.debug("Clearing done (yet some volumes may still deleting)")
diff --git a/wally/statistic.py b/wally/statistic.py
new file mode 100644
index 0000000..0729283
--- /dev/null
+++ b/wally/statistic.py
@@ -0,0 +1,128 @@
+import math
+import itertools
+
+try:
+ from numpy import array, linalg
+ from scipy.optimize import leastsq
+ from numpy.polynomial.chebyshev import chebfit, chebval
+except ImportError:
+ no_numpy = True
+
+
+def med_dev(vals):
+ med = sum(vals) / len(vals)
+ dev = ((sum(abs(med - i) ** 2.0 for i in vals) / len(vals)) ** 0.5)
+ return med, dev
+
+
+def round_deviation(med_dev):
+ med, dev = med_dev
+
+ if dev < 1E-7:
+ return med_dev
+
+ dev_div = 10.0 ** (math.floor(math.log10(dev)) - 1)
+ dev = int(dev / dev_div) * dev_div
+ med = int(med / dev_div) * dev_div
+ return (type(med_dev[0])(med),
+ type(med_dev[1])(dev))
+
+
+def groupby_globally(data, key_func):
+ grouped = {}
+ grouped_iter = itertools.groupby(data, key_func)
+
+ for (bs, cache_tp, act, conc), curr_data_it in grouped_iter:
+ key = (bs, cache_tp, act, conc)
+ grouped.setdefault(key, []).extend(curr_data_it)
+
+ return grouped
+
+
+def approximate_curve(x, y, xnew, curved_coef):
+ """returns ynew - y values of some curve approximation"""
+ if no_numpy:
+ return None
+
+ return chebval(xnew, chebfit(x, y, curved_coef))
+
+
+def approximate_line(x, y, xnew, relative_dist=False):
+ """ x, y - test data, xnew - dots, where we want find approximation
+ if not relative_dist distance = y - newy
+ returns ynew - y values of linear approximation"""
+
+ if no_numpy:
+ return None
+
+ # convert to numpy.array (don't work without it)
+ ox = array(x)
+ oy = array(y)
+
+ # set approximation function
+ def func_line(tpl, x):
+ return tpl[0] * x + tpl[1]
+
+ def error_func_rel(tpl, x, y):
+ return 1.0 - y / func_line(tpl, x)
+
+ def error_func_abs(tpl, x, y):
+ return y - func_line(tpl, x)
+
+ # choose distance mode
+ error_func = error_func_rel if relative_dist else error_func_abs
+
+ tpl_initial = tuple(linalg.solve([[ox[0], 1.0], [ox[1], 1.0]],
+ oy[:2]))
+
+ # find line
+ tpl_final, success = leastsq(error_func,
+ tpl_initial[:],
+ args=(ox, oy))
+
+ # if error
+ if success not in range(1, 5):
+ raise ValueError("No line for this dots")
+
+ # return new dots
+ return func_line(tpl_final, array(xnew))
+
+
+def difference(y, ynew):
+ """returns average and maximum relative and
+ absolute differences between y and ynew
+ result may contain None values for y = 0
+ return value - tuple:
+ [(abs dif, rel dif) * len(y)],
+ (abs average, abs max),
+ (rel average, rel max)"""
+
+ abs_dlist = []
+ rel_dlist = []
+
+ for y1, y2 in zip(y, ynew):
+ # absolute
+ abs_dlist.append(y1 - y2)
+
+ if y1 > 1E-6:
+ rel_dlist.append(abs(abs_dlist[-1] / y1))
+ else:
+ raise ZeroDivisionError("{0!r} is too small".format(y1))
+
+ da_avg = sum(abs_dlist) / len(abs_dlist)
+ dr_avg = sum(rel_dlist) / len(rel_dlist)
+
+ return (zip(abs_dlist, rel_dlist),
+ (da_avg, max(abs_dlist)), (dr_avg, max(rel_dlist))
+ )
+
+
+def calculate_distribution_properties(data):
+ """chi, etc"""
+
+
+def minimal_measurement_amount(data, max_diff, req_probability):
+ """
+ should returns amount of measurements to get results (avg and deviation)
+ with error less, that max_diff in at least req_probability% cases
+ """
diff --git a/wally/suits/__init__.py b/wally/suits/__init__.py
new file mode 100644
index 0000000..7b6610e
--- /dev/null
+++ b/wally/suits/__init__.py
@@ -0,0 +1,3 @@
+from .itest import TwoScriptTest, PgBenchTest, IOPerfTest
+
+__all__ = ["TwoScriptTest", "PgBenchTest", "IOPerfTest"]
diff --git a/wally/suits/io/__init__.py b/wally/suits/io/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/wally/suits/io/__init__.py
diff --git a/wally/suits/io/agent.py b/wally/suits/io/agent.py
new file mode 100644
index 0000000..7589346
--- /dev/null
+++ b/wally/suits/io/agent.py
@@ -0,0 +1,688 @@
+import sys
+import time
+import json
+import random
+import select
+import pprint
+import argparse
+import traceback
+import subprocess
+import itertools
+from collections import OrderedDict
+
+
+SECTION = 0
+SETTING = 1
+
+
+def get_test_sync_mode(config):
+ try:
+ return config['sync_mode']
+ except KeyError:
+ pass
+
+ is_sync = config.get("sync", "0") == "1"
+ is_direct = config.get("direct", "0") == "1"
+
+ if is_sync and is_direct:
+ return 'x'
+ elif is_sync:
+ return 's'
+ elif is_direct:
+ return 'd'
+ else:
+ return 'a'
+
+
+def get_test_summary(params):
+ rw = {"randread": "rr",
+ "randwrite": "rw",
+ "read": "sr",
+ "write": "sw"}[params["rw"]]
+
+ sync_mode = get_test_sync_mode(params)
+ th_count = params.get('numjobs')
+ if th_count is None:
+ th_count = params.get('concurence', '1')
+ th_count = int(th_count)
+
+ return "{0}{1}{2}th{3}".format(rw,
+ sync_mode,
+ params['blocksize'],
+ th_count)
+
+
+counter = [0]
+
+
+def process_section(name, vals, defaults, format_params):
+ vals = vals.copy()
+ params = format_params.copy()
+
+ if '*' in name:
+ name, repeat = name.split('*')
+ name = name.strip()
+ repeat = int(repeat.format(**params))
+ else:
+ repeat = 1
+
+ # this code can be optimized
+ iterable_names = []
+ iterable_values = []
+ processed_vals = {}
+
+ for val_name, val in vals.items():
+ if val is None:
+ processed_vals[val_name] = val
+ # remove hardcode
+ elif val.startswith('{%'):
+ assert val.endswith("%}")
+ content = val[2:-2].format(**params)
+ iterable_names.append(val_name)
+ iterable_values.append(list(i.strip() for i in content.split(',')))
+ else:
+ processed_vals[val_name] = val.format(**params)
+
+ group_report_err_msg = "Group reporting should be set if numjobs != 1"
+
+ if iterable_values == []:
+ params['UNIQ'] = 'UN{0}'.format(counter[0])
+ counter[0] += 1
+ params['TEST_SUMM'] = get_test_summary(processed_vals)
+
+ if processed_vals.get('numjobs', '1') != '1':
+ assert 'group_reporting' in processed_vals, group_report_err_msg
+
+ ramp_time = processed_vals.get('ramp_time')
+ for i in range(repeat):
+ yield name.format(**params), processed_vals.copy()
+
+ if 'ramp_time' in processed_vals:
+ del processed_vals['ramp_time']
+
+ if ramp_time is not None:
+ processed_vals['ramp_time'] = ramp_time
+ else:
+ for it_vals in itertools.product(*iterable_values):
+ processed_vals.update(dict(zip(iterable_names, it_vals)))
+ params['UNIQ'] = 'UN{0}'.format(counter[0])
+ counter[0] += 1
+ params['TEST_SUMM'] = get_test_summary(processed_vals)
+
+ if processed_vals.get('numjobs', '1') != '1':
+ assert 'group_reporting' in processed_vals,\
+ group_report_err_msg
+
+ ramp_time = processed_vals.get('ramp_time')
+
+ for i in range(repeat):
+ yield name.format(**params), processed_vals.copy()
+ if 'ramp_time' in processed_vals:
+ processed_vals['_ramp_time'] = ramp_time
+ processed_vals.pop('ramp_time')
+
+ if ramp_time is not None:
+ processed_vals['ramp_time'] = ramp_time
+ processed_vals.pop('_ramp_time')
+
+
+def calculate_execution_time(combinations):
+ time = 0
+ for _, params in combinations:
+ time += int(params.get('ramp_time', 0))
+ time += int(params.get('runtime', 0))
+ return time
+
+
+def parse_fio_config_full(fio_cfg, params=None):
+ defaults = {}
+ format_params = {}
+
+ if params is None:
+ ext_params = {}
+ else:
+ ext_params = params.copy()
+
+ curr_section = None
+ curr_section_name = None
+
+ for tp, name, val in parse_fio_config_iter(fio_cfg):
+ if tp == SECTION:
+ non_def = curr_section_name != 'defaults'
+ if curr_section_name is not None and non_def:
+ format_params.update(ext_params)
+ for sec in process_section(curr_section_name,
+ curr_section,
+ defaults,
+ format_params):
+ yield sec
+
+ if name == 'defaults':
+ curr_section = defaults
+ else:
+ curr_section = OrderedDict()
+ curr_section.update(defaults)
+ curr_section_name = name
+
+ else:
+ assert tp == SETTING
+ assert curr_section_name is not None, "no section name"
+ if name == name.upper():
+ assert curr_section_name == 'defaults'
+ format_params[name] = val
+ else:
+ curr_section[name] = val
+
+ if curr_section_name is not None and curr_section_name != 'defaults':
+ format_params.update(ext_params)
+ for sec in process_section(curr_section_name,
+ curr_section,
+ defaults,
+ format_params):
+ yield sec
+
+
+def parse_fio_config_iter(fio_cfg):
+ for lineno, line in enumerate(fio_cfg.split("\n")):
+ try:
+ line = line.strip()
+
+ if line.startswith("#") or line.startswith(";"):
+ continue
+
+ if line == "":
+ continue
+
+ if line.startswith('['):
+ assert line.endswith(']'), "name should ends with ]"
+ yield SECTION, line[1:-1], None
+ elif '=' in line:
+ opt_name, opt_val = line.split('=', 1)
+ yield SETTING, opt_name.strip(), opt_val.strip()
+ else:
+ yield SETTING, line, None
+ except Exception as exc:
+ pref = "During parsing line number {0}\n".format(lineno)
+ raise ValueError(pref + exc.message)
+
+
+def format_fio_config(fio_cfg):
+ res = ""
+ for pos, (name, section) in enumerate(fio_cfg):
+ if name.startswith('_'):
+ continue
+
+ if pos != 0:
+ res += "\n"
+
+ res += "[{0}]\n".format(name)
+ for opt_name, opt_val in section.items():
+ if opt_val is None:
+ res += opt_name + "\n"
+ else:
+ res += "{0}={1}\n".format(opt_name, opt_val)
+ return res
+
+
+count = 0
+
+
+def to_bytes(sz):
+ sz = sz.lower()
+ try:
+ return int(sz)
+ except ValueError:
+ if sz[-1] == 'm':
+ return (1024 ** 2) * int(sz[:-1])
+ if sz[-1] == 'k':
+ return 1024 * int(sz[:-1])
+ raise
+
+
+def do_run_fio_fake(bconf):
+ def estimate_iops(sz, bw, lat):
+ return 1 / (lat + float(sz) / bw)
+ global count
+ count += 1
+ parsed_out = []
+
+ BW = 120.0 * (1024 ** 2)
+ LAT = 0.003
+
+ for name, cfg in bconf:
+ sz = to_bytes(cfg['blocksize'])
+ curr_lat = LAT * ((random.random() - 0.5) * 0.1 + 1)
+ curr_ulat = curr_lat * 1000000
+ curr_bw = BW * ((random.random() - 0.5) * 0.1 + 1)
+ iops = estimate_iops(sz, curr_bw, curr_lat)
+ bw = iops * sz
+
+ res = {'ctx': 10683,
+ 'error': 0,
+ 'groupid': 0,
+ 'jobname': name,
+ 'majf': 0,
+ 'minf': 30,
+ 'read': {'bw': 0,
+ 'bw_agg': 0.0,
+ 'bw_dev': 0.0,
+ 'bw_max': 0,
+ 'bw_mean': 0.0,
+ 'bw_min': 0,
+ 'clat': {'max': 0,
+ 'mean': 0.0,
+ 'min': 0,
+ 'stddev': 0.0},
+ 'io_bytes': 0,
+ 'iops': 0,
+ 'lat': {'max': 0, 'mean': 0.0,
+ 'min': 0, 'stddev': 0.0},
+ 'runtime': 0,
+ 'slat': {'max': 0, 'mean': 0.0,
+ 'min': 0, 'stddev': 0.0}
+ },
+ 'sys_cpu': 0.64,
+ 'trim': {'bw': 0,
+ 'bw_agg': 0.0,
+ 'bw_dev': 0.0,
+ 'bw_max': 0,
+ 'bw_mean': 0.0,
+ 'bw_min': 0,
+ 'clat': {'max': 0,
+ 'mean': 0.0,
+ 'min': 0,
+ 'stddev': 0.0},
+ 'io_bytes': 0,
+ 'iops': 0,
+ 'lat': {'max': 0, 'mean': 0.0,
+ 'min': 0, 'stddev': 0.0},
+ 'runtime': 0,
+ 'slat': {'max': 0, 'mean': 0.0,
+ 'min': 0, 'stddev': 0.0}
+ },
+ 'usr_cpu': 0.23,
+ 'write': {'bw': 0,
+ 'bw_agg': 0,
+ 'bw_dev': 0,
+ 'bw_max': 0,
+ 'bw_mean': 0,
+ 'bw_min': 0,
+ 'clat': {'max': 0, 'mean': 0,
+ 'min': 0, 'stddev': 0},
+ 'io_bytes': 0,
+ 'iops': 0,
+ 'lat': {'max': 0, 'mean': 0,
+ 'min': 0, 'stddev': 0},
+ 'runtime': 0,
+ 'slat': {'max': 0, 'mean': 0.0,
+ 'min': 0, 'stddev': 0.0}
+ }
+ }
+
+ if cfg['rw'] in ('read', 'randread'):
+ key = 'read'
+ elif cfg['rw'] in ('write', 'randwrite'):
+ key = 'write'
+ else:
+ raise ValueError("Uknown op type {0}".format(key))
+
+ res[key]['bw'] = bw
+ res[key]['iops'] = iops
+ res[key]['runtime'] = 30
+ res[key]['io_bytes'] = res[key]['runtime'] * bw
+ res[key]['bw_agg'] = bw
+ res[key]['bw_dev'] = bw / 30
+ res[key]['bw_max'] = bw * 1.5
+ res[key]['bw_min'] = bw / 1.5
+ res[key]['bw_mean'] = bw
+ res[key]['clat'] = {'max': curr_ulat * 10, 'mean': curr_ulat,
+ 'min': curr_ulat / 2, 'stddev': curr_ulat}
+ res[key]['lat'] = res[key]['clat'].copy()
+ res[key]['slat'] = res[key]['clat'].copy()
+
+ parsed_out.append(res)
+
+ return zip(parsed_out, bconf)
+
+
+def do_run_fio(bconf):
+ benchmark_config = format_fio_config(bconf)
+ cmd = ["fio", "--output-format=json", "--alloc-size=262144", "-"]
+ p = subprocess.Popen(cmd, stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT)
+
+ # set timeout
+ raw_out, _ = p.communicate(benchmark_config)
+
+ try:
+ parsed_out = json.loads(raw_out)["jobs"]
+ except KeyError:
+ msg = "Can't parse fio output {0!r}: no 'jobs' found"
+ raw_out = raw_out[:100]
+ raise ValueError(msg.format(raw_out))
+
+ except Exception as exc:
+ msg = "Can't parse fio output: {0!r}\nError: {1}"
+ raw_out = raw_out[:100]
+ raise ValueError(msg.format(raw_out, exc.message))
+
+ return zip(parsed_out, bconf)
+
+# limited by fio
+MAX_JOBS = 1000
+
+
+def next_test_portion(whole_conf, runcycle):
+ jcount = 0
+ runtime = 0
+ bconf = []
+
+ for pos, (name, sec) in enumerate(whole_conf):
+ jc = int(sec.get('numjobs', '1'))
+
+ if runcycle is not None:
+ curr_task_time = calculate_execution_time([(name, sec)])
+ else:
+ curr_task_time = 0
+
+ if jc > MAX_JOBS:
+ err_templ = "Can't process job {0!r} - too large numjobs"
+ raise ValueError(err_templ.format(name))
+
+ if runcycle is not None and len(bconf) != 0:
+ rc_ok = curr_task_time + runtime <= runcycle
+ else:
+ rc_ok = True
+
+ if jc + jcount <= MAX_JOBS and rc_ok:
+ runtime += curr_task_time
+ jcount += jc
+ bconf.append((name, sec))
+ if '_ramp_time' in sec:
+ del sec['_ramp_time']
+ continue
+
+ assert len(bconf) != 0
+ yield bconf
+
+ if '_ramp_time' in sec:
+ sec['ramp_time'] = sec.pop('_ramp_time')
+ curr_task_time = calculate_execution_time([(name, sec)])
+
+ runtime = curr_task_time
+ jcount = jc
+ bconf = [(name, sec)]
+
+ if bconf != []:
+ yield bconf
+
+
+def add_job_results(jname, job_output, jconfig, res):
+ if job_output['write']['iops'] != 0:
+ raw_result = job_output['write']
+ else:
+ raw_result = job_output['read']
+
+ if jname not in res:
+ j_res = {}
+ j_res["rw"] = jconfig["rw"]
+ j_res["sync_mode"] = get_test_sync_mode(jconfig)
+ j_res["concurence"] = int(jconfig.get("numjobs", 1))
+ j_res["blocksize"] = jconfig["blocksize"]
+ j_res["jobname"] = job_output["jobname"]
+ j_res["timings"] = [int(jconfig.get("runtime", 0)),
+ int(jconfig.get("ramp_time", 0))]
+ else:
+ j_res = res[jname]
+ assert j_res["rw"] == jconfig["rw"]
+ assert j_res["rw"] == jconfig["rw"]
+ assert j_res["sync_mode"] == get_test_sync_mode(jconfig)
+ assert j_res["concurence"] == int(jconfig.get("numjobs", 1))
+ assert j_res["blocksize"] == jconfig["blocksize"]
+ assert j_res["jobname"] == job_output["jobname"]
+
+ # ramp part is skipped for all tests, except first
+ # assert j_res["timings"] == (jconfig.get("runtime"),
+ # jconfig.get("ramp_time"))
+
+ def j_app(name, x):
+ j_res.setdefault(name, []).append(x)
+
+ j_app("bw", raw_result["bw"])
+ j_app("iops", raw_result["iops"])
+ j_app("lat", raw_result["lat"]["mean"])
+ j_app("clat", raw_result["clat"]["mean"])
+ j_app("slat", raw_result["slat"]["mean"])
+
+ res[jname] = j_res
+
+
+def compile(benchmark_config, params, runcycle=None):
+ whole_conf = list(parse_fio_config_full(benchmark_config, params))
+ res = ""
+
+ for bconf in next_test_portion(whole_conf, runcycle):
+ res += format_fio_config(bconf)
+
+ return res
+
+
+def run_fio(benchmark_config,
+ params,
+ runcycle=None,
+ raw_results_func=None,
+ skip_tests=0,
+ fake_fio=False):
+
+ whole_conf = list(parse_fio_config_full(benchmark_config, params))
+ whole_conf = whole_conf[skip_tests:]
+ res = {}
+ curr_test_num = skip_tests
+ executed_tests = 0
+ ok = True
+ try:
+ for bconf in next_test_portion(whole_conf, runcycle):
+
+ if fake_fio:
+ res_cfg_it = do_run_fio_fake(bconf)
+ else:
+ res_cfg_it = do_run_fio(bconf)
+
+ res_cfg_it = enumerate(res_cfg_it, curr_test_num)
+
+ for curr_test_num, (job_output, (jname, jconfig)) in res_cfg_it:
+ executed_tests += 1
+ if raw_results_func is not None:
+ raw_results_func(executed_tests,
+ [job_output, jname, jconfig])
+
+ assert jname == job_output["jobname"], \
+ "{0} != {1}".format(jname, job_output["jobname"])
+
+ if jname.startswith('_'):
+ continue
+
+ add_job_results(jname, job_output, jconfig, res)
+
+ msg_template = "Done {0} tests from {1}. ETA: {2}"
+ exec_time = estimate_cfg(benchmark_config, params, curr_test_num)
+
+ print msg_template.format(curr_test_num - skip_tests,
+ len(whole_conf),
+ sec_to_str(exec_time))
+
+ except (SystemExit, KeyboardInterrupt):
+ raise
+
+ except Exception:
+ print "=========== ERROR ============="
+ traceback.print_exc()
+ print "======== END OF ERROR ========="
+ ok = False
+
+ return res, executed_tests, ok
+
+
+def run_benchmark(binary_tp, *argv, **kwargs):
+ if 'fio' == binary_tp:
+ return run_fio(*argv, **kwargs)
+ raise ValueError("Unknown behcnmark {0}".format(binary_tp))
+
+
+def read_config(fd, timeout=10):
+ job_cfg = ""
+ etime = time.time() + timeout
+ while True:
+ wtime = etime - time.time()
+ if wtime <= 0:
+ raise IOError("No config provided")
+
+ r, w, x = select.select([fd], [], [], wtime)
+ if len(r) == 0:
+ raise IOError("No config provided")
+
+ char = fd.read(1)
+ if '' == char:
+ return job_cfg
+
+ job_cfg += char
+
+
+def estimate_cfg(job_cfg, params, skip_tests=0):
+ bconf = list(parse_fio_config_full(job_cfg, params))[skip_tests:]
+ return calculate_execution_time(bconf)
+
+
+def sec_to_str(seconds):
+ h = seconds // 3600
+ m = (seconds % 3600) // 60
+ s = seconds % 60
+ return "{0}:{1:02d}:{2:02d}".format(h, m, s)
+
+
+def parse_args(argv):
+ parser = argparse.ArgumentParser(
+ description="Run fio' and return result")
+ parser.add_argument("--type", metavar="BINARY_TYPE",
+ choices=['fio'], default='fio',
+ help=argparse.SUPPRESS)
+ parser.add_argument("--start-at", metavar="START_AT_UTC", type=int,
+ help="Start execution at START_AT_UTC")
+ parser.add_argument("--json", action="store_true", default=False,
+ help="Json output format")
+ parser.add_argument("--output", default='-', metavar="FILE_PATH",
+ help="Store results to FILE_PATH")
+ parser.add_argument("--estimate", action="store_true", default=False,
+ help="Only estimate task execution time")
+ parser.add_argument("--compile", action="store_true", default=False,
+ help="Compile config file to fio config")
+ parser.add_argument("--num-tests", action="store_true", default=False,
+ help="Show total number of tests")
+ parser.add_argument("--runcycle", type=int, default=None,
+ metavar="MAX_CYCLE_SECONDS",
+ help="Max cycle length in seconds")
+ parser.add_argument("--show-raw-results", action='store_true',
+ default=False, help="Output raw input and results")
+ parser.add_argument("--skip-tests", type=int, default=0, metavar="NUM",
+ help="Skip NUM tests")
+ parser.add_argument("--faked-fio", action='store_true',
+ default=False, help="Emulate fio with 0 test time")
+ parser.add_argument("--params", nargs="*", metavar="PARAM=VAL",
+ default=[],
+ help="Provide set of pairs PARAM=VAL to" +
+ "format into job description")
+ parser.add_argument("jobfile")
+ return parser.parse_args(argv)
+
+
+def main(argv):
+ argv_obj = parse_args(argv)
+
+ if argv_obj.jobfile == '-':
+ job_cfg = read_config(sys.stdin)
+ else:
+ job_cfg = open(argv_obj.jobfile).read()
+
+ if argv_obj.output == '-':
+ out_fd = sys.stdout
+ else:
+ out_fd = open(argv_obj.output, "w")
+
+ params = {}
+ for param_val in argv_obj.params:
+ assert '=' in param_val
+ name, val = param_val.split("=", 1)
+ params[name] = val
+
+ if argv_obj.estimate:
+ print sec_to_str(estimate_cfg(job_cfg, params))
+ return 0
+
+ if argv_obj.num_tests or argv_obj.compile:
+ bconf = list(parse_fio_config_full(job_cfg, params))
+ bconf = bconf[argv_obj.skip_tests:]
+
+ if argv_obj.compile:
+ out_fd.write(format_fio_config(bconf))
+ out_fd.write("\n")
+
+ if argv_obj.num_tests:
+ print len(bconf)
+
+ return 0
+
+ if argv_obj.start_at is not None:
+ ctime = time.time()
+ if argv_obj.start_at >= ctime:
+ time.sleep(ctime - argv_obj.start_at)
+
+ def raw_res_func(test_num, data):
+ pref = "========= RAW_RESULTS({0}) =========\n".format(test_num)
+ out_fd.write(pref)
+ out_fd.write(json.dumps(data))
+ out_fd.write("\n========= END OF RAW_RESULTS =========\n")
+ out_fd.flush()
+
+ rrfunc = raw_res_func if argv_obj.show_raw_results else None
+
+ stime = time.time()
+ job_res, num_tests, ok = run_benchmark(argv_obj.type,
+ job_cfg,
+ params,
+ argv_obj.runcycle,
+ rrfunc,
+ argv_obj.skip_tests,
+ argv_obj.faked_fio)
+ etime = time.time()
+
+ res = {'__meta__': {'raw_cfg': job_cfg, 'params': params}, 'res': job_res}
+
+ oformat = 'json' if argv_obj.json else 'eval'
+ out_fd.write("\nRun {0} tests in {1} seconds\n".format(num_tests,
+ int(etime - stime)))
+ out_fd.write("========= RESULTS(format={0}) =========\n".format(oformat))
+ if argv_obj.json:
+ out_fd.write(json.dumps(res))
+ else:
+ out_fd.write(pprint.pformat(res) + "\n")
+ out_fd.write("\n========= END OF RESULTS =========\n")
+
+ return 0 if ok else 1
+
+
+def fake_main(x):
+ import yaml
+ time.sleep(60)
+ out_fd = sys.stdout
+ fname = "/tmp/perf_tests/metempirical_alisha/raw_results.yaml"
+ res = yaml.load(open(fname).read())[0][1]
+ out_fd.write("========= RESULTS(format=json) =========\n")
+ out_fd.write(json.dumps(res))
+ out_fd.write("\n========= END OF RESULTS =========\n")
+ return 0
+
+
+if __name__ == '__main__':
+ # exit(fake_main(sys.argv[1:]))
+ exit(main(sys.argv[1:]))
diff --git a/wally/suits/io/formatter.py b/wally/suits/io/formatter.py
new file mode 100644
index 0000000..529b78a
--- /dev/null
+++ b/wally/suits/io/formatter.py
@@ -0,0 +1,53 @@
+import texttable
+
+from wally.utils import ssize_to_b
+from wally.statistic import med_dev
+from wally.suits.io.agent import get_test_summary
+
+
+def key_func(k_data):
+ _, data = k_data
+
+ return (data['rw'],
+ data['sync_mode'],
+ ssize_to_b(data['blocksize']),
+ data['concurence'])
+
+
+def format_results_for_console(test_set):
+ """
+ create a table with io performance report
+ for console
+ """
+ tab = texttable.Texttable()
+ tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
+ tab.set_cols_align(["l", "r", "r", "r", "r"])
+
+ prev_k = None
+ items = sorted(test_set['res'].items(), key=key_func)
+
+ for test_name, data in items:
+ curr_k = key_func((test_name, data))[:3]
+
+ if prev_k is not None:
+ if prev_k != curr_k:
+ tab.add_row(["---"] * 5)
+
+ prev_k = curr_k
+
+ descr = get_test_summary(data)
+
+ iops, _ = med_dev(data['iops'])
+ bw, bwdev = med_dev(data['bw'])
+
+ # 3 * sigma
+ dev_perc = int((bwdev * 300) / bw)
+
+ params = (descr, int(iops), int(bw), dev_perc,
+ int(med_dev(data['lat'])[0]) // 1000)
+ tab.add_row(params)
+
+ header = ["Description", "IOPS", "BW KiBps", "Dev * 3 %", "clat ms"]
+ tab.header(header)
+
+ return tab.draw()
diff --git a/wally/suits/io/io_scenario_check_distribution.cfg b/wally/suits/io/io_scenario_check_distribution.cfg
new file mode 100644
index 0000000..6ba3f9f
--- /dev/null
+++ b/wally/suits/io/io_scenario_check_distribution.cfg
@@ -0,0 +1,13 @@
+[distrubution_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=4k
+rw=randwrite
+direct=1
+ramp_time=5
+buffered=0
+wait_for_previous
+filename={FILENAME}
+iodepth=1
+size=10Gb
+time_based
+runtime=30
+group_reporting
diff --git a/wally/suits/io/io_scenario_check_linearity.cfg b/wally/suits/io/io_scenario_check_linearity.cfg
new file mode 100644
index 0000000..4017cf3
--- /dev/null
+++ b/wally/suits/io/io_scenario_check_linearity.cfg
@@ -0,0 +1,29 @@
+[defaults]
+NUM_ROUNDS=7
+
+ramp_time=5
+buffered=0
+wait_for_previous
+filename={FILENAME}
+iodepth=1
+size=10Gb
+time_based
+runtime=30
+
+# ---------------------------------------------------------------------
+# check read and write linearity. oper_time = func(size)
+# ---------------------------------------------------------------------
+[linearity_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize={% 512, 1k, 4k, 8k, 16k, 32k, 64k, 128k, 256k, 512k, 1m, 2m, 4m %}
+rw={% randwrite, randread %}
+direct=1
+
+# ---------------------------------------------------------------------
+# check sync write linearity. oper_time = func(size)
+# check sync BW as well
+# ---------------------------------------------------------------------
+[linearity_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize={% 512, 1k, 4k, 8k, 16k, 32k, 64k, 128k, 256k, 512k, 1m, 2m, 4m %}
+rw=randwrite
+sync=1
+
diff --git a/wally/suits/io/io_scenario_check_th_count.cfg b/wally/suits/io/io_scenario_check_th_count.cfg
new file mode 100644
index 0000000..3d57154
--- /dev/null
+++ b/wally/suits/io/io_scenario_check_th_count.cfg
@@ -0,0 +1,46 @@
+[defaults]
+NUM_ROUNDS=7
+ramp_time=5
+buffered=0
+wait_for_previous
+filename={FILENAME}
+iodepth=1
+size=10Gb
+time_based
+runtime=30
+group_reporting
+numjobs={% 1, 2, 5, 10, 15, 20, 25, 30, 35, 40 %}
+
+# ---------------------------------------------------------------------
+# check different thread count. (latency, bw) = func(th_count)
+#
+# RANDOM R IOPS, DIRECT, should act same as AS (4k + randread + sync)
+# just faster. Not sure, that we need it
+# 4k + randread + direct
+#
+# RANDOM R/W IOPS
+# 4k + randread + sync
+# 4k + randwrite + sync
+#
+# LINEAR BW
+# 1m + write + direct
+# 1m + read + direct
+#
+# ---------------------------------------------------------------------
+[concurrence_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=4k
+rw={% randread %}
+direct=1
+sync=0
+
+[concurrence_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=4k
+rw=randwrite
+direct=0
+sync=1
+
+[concurrence_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=1m
+rw={% write, read %}
+direct=1
+sync=0
diff --git a/wally/suits/io/io_scenario_check_vm_count_ec2.cfg b/wally/suits/io/io_scenario_check_vm_count_ec2.cfg
new file mode 100644
index 0000000..19c9e50
--- /dev/null
+++ b/wally/suits/io/io_scenario_check_vm_count_ec2.cfg
@@ -0,0 +1,29 @@
+[defaults]
+NUM_ROUNDS=7
+ramp_time=5
+buffered=0
+wait_for_previous
+filename={FILENAME}
+iodepth=1
+size=10Gb
+time_based
+runtime=30
+group_reporting
+rate={BW_LIMIT}
+rate_iops={IOPS_LIMIT}
+
+# ---------------------------------------------------------------------
+# check different thread count. (latency, bw) = func(th_count)
+# ---------------------------------------------------------------------
+[vm_count_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize={% 4k, 1m %}
+rw={% randwrite, randread %}
+direct=1
+numjobs={% 1, 2, 5, 10, 15, 20, 25, 30, 35, 40 %}
+
+[vm_count_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize={% 4k, 1m %}
+rw=randwrite
+direct=0
+sync=1
+numjobs={% 1, 2, 5, 10, 15, 20, 25, 30, 35, 40 %}
diff --git a/wally/suits/io/io_scenario_check_warmup.cfg b/wally/suits/io/io_scenario_check_warmup.cfg
new file mode 100644
index 0000000..6a9c622
--- /dev/null
+++ b/wally/suits/io/io_scenario_check_warmup.cfg
@@ -0,0 +1,33 @@
+[defaults]
+NUM_ROUNDS=7
+
+ramp_time=5
+buffered=0
+wait_for_previous
+filename={FILENAME}
+iodepth=1
+size=10Gb
+time_based
+runtime=30
+
+# ---------------------------------------------------------------------
+# check test time, no warmup time. iops = func(rune_time)
+# ---------------------------------------------------------------------
+[runtime_test_wo_wu_{TEST_SUMM}_{UNIQ} * {NUM_ROUNDS}]
+startdelay=10
+blocksize={% 4k, 1m %}
+rw=randwrite
+direct=1
+ramp_time=0
+runtime={% 10, 15, 20, 30, 60, 120 %}
+
+# ---------------------------------------------------------------------
+# check test time, with warmup time. iops = func(rune_time)
+# ---------------------------------------------------------------------
+[runtime_test_w_wu_{TEST_SUMM}_{UNIQ} * {NUM_ROUNDS}]
+startdelay=10
+blocksize={% 4k, 1m %}
+rw=randwrite
+direct=1
+ramp_time={% 5, 10, 15 %}
+runtime={% 15, 30 %}
diff --git a/wally/suits/io/io_scenario_hdd.cfg b/wally/suits/io/io_scenario_hdd.cfg
new file mode 100644
index 0000000..5e24009
--- /dev/null
+++ b/wally/suits/io/io_scenario_hdd.cfg
@@ -0,0 +1,50 @@
+[defaults]
+wait_for_previous
+group_reporting
+time_based
+buffered=0
+iodepth=1
+softrandommap=1
+filename={FILENAME}
+NUM_ROUNDS=7
+
+ramp_time=5
+size=10Gb
+runtime=30
+
+# ---------------------------------------------------------------------
+# check different thread count, sync mode. (latency, iops) = func(th_count)
+# ---------------------------------------------------------------------
+[hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=4k
+rw=randwrite
+sync=1
+numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
+
+# ---------------------------------------------------------------------
+# check different thread count, direct read mode. (latency, iops) = func(th_count)
+# also check iops for randread
+# ---------------------------------------------------------------------
+[hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=4k
+rw=randread
+direct=1
+numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
+
+# ---------------------------------------------------------------------
+# check different thread count, direct read/write mode. (bw, iops) = func(th_count)
+# also check BW for seq read/write.
+# ---------------------------------------------------------------------
+[hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=1m
+rw={% read, write %}
+direct=1
+numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
+
+# ---------------------------------------------------------------------
+# check IOPS randwrite.
+# ---------------------------------------------------------------------
+[hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=4k
+rw=randwrite
+direct=1
diff --git a/wally/suits/io/io_scenario_long_test.cfg b/wally/suits/io/io_scenario_long_test.cfg
new file mode 100644
index 0000000..4b0a79d
--- /dev/null
+++ b/wally/suits/io/io_scenario_long_test.cfg
@@ -0,0 +1,28 @@
+[defaults]
+
+# 24h test
+NUM_ROUNDS1=270
+NUM_ROUNDS2=261
+
+buffered=0
+wait_for_previous
+filename={FILENAME}
+iodepth=1
+size=50Gb
+time_based
+runtime=300
+
+# ---------------------------------------------------------------------
+# check read and write linearity. oper_time = func(size)
+# ---------------------------------------------------------------------
+[24h_test * {NUM_ROUNDS1}]
+blocksize=128k
+rw=randwrite
+direct=1
+runtime=30
+
+[24h_test * {NUM_ROUNDS2}]
+blocksize=128k
+rw=randwrite
+direct=1
+
diff --git a/wally/suits/io/results_loader.py b/wally/suits/io/results_loader.py
new file mode 100644
index 0000000..25721eb
--- /dev/null
+++ b/wally/suits/io/results_loader.py
@@ -0,0 +1,55 @@
+import re
+import json
+
+
+from wally.utils import ssize_to_b
+from wally.statistic import med_dev
+
+
+def parse_output(out_err):
+ start_patt = r"(?ims)=+\s+RESULTS\(format=json\)\s+=+"
+ end_patt = r"(?ims)=+\s+END OF RESULTS\s+=+"
+
+ for block in re.split(start_patt, out_err)[1:]:
+ data, garbage = re.split(end_patt, block)
+ yield json.loads(data.strip())
+
+ start_patt = r"(?ims)=+\s+RESULTS\(format=eval\)\s+=+"
+ end_patt = r"(?ims)=+\s+END OF RESULTS\s+=+"
+
+ for block in re.split(start_patt, out_err)[1:]:
+ data, garbage = re.split(end_patt, block)
+ yield eval(data.strip())
+
+
+def filter_data(name_prefix, fields_to_select, **filters):
+ def closure(data):
+ for result in data:
+ if name_prefix is not None:
+ if not result['jobname'].startswith(name_prefix):
+ continue
+
+ for k, v in filters.items():
+ if result.get(k) != v:
+ break
+ else:
+ yield map(result.get, fields_to_select)
+ return closure
+
+
+def load_data(raw_data):
+ data = list(parse_output(raw_data))[0]
+
+ for key, val in data['res'].items():
+ val['blocksize_b'] = ssize_to_b(val['blocksize'])
+
+ val['iops_mediana'], val['iops_stddev'] = med_dev(val['iops'])
+ val['bw_mediana'], val['bw_stddev'] = med_dev(val['bw'])
+ val['lat_mediana'], val['lat_stddev'] = med_dev(val['lat'])
+ yield val
+
+
+def load_files(*fnames):
+ for fname in fnames:
+ for i in load_data(open(fname).read()):
+ yield i
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
new file mode 100644
index 0000000..91e9dd5
--- /dev/null
+++ b/wally/suits/itest.py
@@ -0,0 +1,237 @@
+import abc
+import time
+import os.path
+import logging
+
+from wally.ssh_utils import copy_paths, run_over_ssh, delete_file
+from wally.utils import ssize_to_b, open_for_append_or_create, sec_to_str
+
+from . import postgres
+from .io import agent as io_agent
+from .io import formatter as io_formatter
+from .io.results_loader import parse_output
+
+
+logger = logging.getLogger("wally")
+
+
+class IPerfTest(object):
+ def __init__(self, on_result_cb, log_directory=None, node=None):
+ self.on_result_cb = on_result_cb
+ self.log_directory = log_directory
+ self.node = node
+
+ def pre_run(self, conn):
+ pass
+
+ def cleanup(self, conn):
+ pass
+
+ @abc.abstractmethod
+ def run(self, conn, barrier):
+ pass
+
+ @classmethod
+ def format_for_console(cls, data):
+ msg = "{0}.format_for_console".format(cls.__name__)
+ raise NotImplementedError(msg)
+
+
+class TwoScriptTest(IPerfTest):
+ remote_tmp_dir = '/tmp'
+
+ def __init__(self, opts, on_result_cb, log_directory=None, node=None):
+ IPerfTest.__init__(self, on_result_cb, log_directory, node=node)
+ self.opts = opts
+
+ if 'run_script' in self.opts:
+ self.run_script = self.opts['run_script']
+ self.prepare_script = self.opts['prepare_script']
+
+ def get_remote_for_script(self, script):
+ return os.path.join(self.tmp_dir, script.rpartition('/')[2])
+
+ def copy_script(self, conn, src):
+ remote_path = self.get_remote_for_script(src)
+ copy_paths(conn, {src: remote_path})
+ return remote_path
+
+ def pre_run(self, conn):
+ remote_script = self.copy_script(conn, self.pre_run_script)
+ cmd = remote_script
+ run_over_ssh(conn, cmd, node=self.node)
+
+ def run(self, conn, barrier):
+ remote_script = self.copy_script(conn, self.run_script)
+ cmd_opts = ' '.join(["%s %s" % (key, val) for key, val
+ in self.opts.items()])
+ cmd = remote_script + ' ' + cmd_opts
+ out_err = run_over_ssh(conn, cmd, node=self.node)
+ self.on_result(out_err, cmd)
+
+ def parse_results(self, out):
+ for line in out.split("\n"):
+ key, separator, value = line.partition(":")
+ if key and value:
+ self.on_result_cb((key, float(value)))
+
+ def on_result(self, out_err, cmd):
+ try:
+ self.parse_results(out_err)
+ except Exception as exc:
+ msg_templ = "Error during postprocessing results: {0!r}. {1}"
+ raise RuntimeError(msg_templ.format(exc.message, out_err))
+
+
+class PgBenchTest(TwoScriptTest):
+ root = os.path.dirname(postgres.__file__)
+ prepare_script = os.path.join(root, "prepare.sh")
+ run_script = os.path.join(root, "run.sh")
+
+
+class IOPerfTest(IPerfTest):
+ io_py_remote = "/tmp/disk_test_agent.py"
+
+ def __init__(self, test_options, on_result_cb,
+ log_directory=None, node=None):
+ IPerfTest.__init__(self, on_result_cb, log_directory, node=node)
+ self.options = test_options
+ self.config_fname = test_options['cfg']
+ self.alive_check_interval = test_options.get('alive_check_interval')
+ self.config_params = test_options.get('params', {})
+ self.tool = test_options.get('tool', 'fio')
+ self.raw_cfg = open(self.config_fname).read()
+ self.configs = list(io_agent.parse_fio_config_full(self.raw_cfg,
+ self.config_params))
+
+ cmd_log = os.path.join(self.log_directory, "task_compiled.cfg")
+ raw_res = os.path.join(self.log_directory, "raw_results.txt")
+
+ fio_command_file = open_for_append_or_create(cmd_log)
+ fio_command_file.write(io_agent.compile(self.raw_cfg,
+ self.config_params,
+ None))
+ self.fio_raw_results_file = open_for_append_or_create(raw_res)
+
+ def cleanup(self, conn):
+ delete_file(conn, self.io_py_remote)
+ # Need to remove tempo files, used for testing
+
+ def pre_run(self, conn):
+ try:
+ run_over_ssh(conn, 'which fio', node=self.node)
+ except OSError:
+ # TODO: install fio, if not installed
+ cmd = "sudo apt-get -y install fio"
+
+ for i in range(3):
+ try:
+ run_over_ssh(conn, cmd, node=self.node)
+ break
+ except OSError as err:
+ time.sleep(3)
+ else:
+ raise OSError("Can't install fio - " + err.message)
+
+ local_fname = io_agent.__file__.rsplit('.')[0] + ".py"
+ self.files_to_copy = {local_fname: self.io_py_remote}
+ copy_paths(conn, self.files_to_copy)
+
+ files = {}
+
+ for secname, params in self.configs:
+ sz = ssize_to_b(params['size'])
+ msz = msz = sz / (1024 ** 2)
+ if sz % (1024 ** 2) != 0:
+ msz += 1
+
+ fname = params['filename']
+ files[fname] = max(files.get(fname, 0), msz)
+
+ # logger.warning("dd run DISABLED")
+ # cmd_templ = "dd if=/dev/zero of={0} bs={1} count={2}"
+
+ cmd_templ = "sudo dd if=/dev/zero of={0} bs={1} count={2}"
+ for fname, sz in files.items():
+ cmd = cmd_templ.format(fname, 1024 ** 2, msz)
+ run_over_ssh(conn, cmd, timeout=msz, node=self.node)
+
+ def run(self, conn, barrier):
+ cmd_templ = "sudo env python2 {0} --type {1} {2} --json -"
+ # cmd_templ = "env python2 {0} --type {1} {2} --json -"
+
+ params = " ".join("{0}={1}".format(k, v)
+ for k, v in self.config_params.items())
+
+ if "" != params:
+ params = "--params " + params
+
+ cmd = cmd_templ.format(self.io_py_remote, self.tool, params)
+ logger.debug("Waiting on barrier")
+
+ exec_time = io_agent.estimate_cfg(self.raw_cfg, self.config_params)
+ exec_time_str = sec_to_str(exec_time)
+
+ try:
+ if barrier.wait():
+ templ = "Test should takes about {0}. Will wait at most {1}"
+ timeout = int(exec_time * 1.1 + 300)
+ logger.info(templ.format(exec_time_str, sec_to_str(timeout)))
+
+ out_err = run_over_ssh(conn, cmd,
+ stdin_data=self.raw_cfg,
+ timeout=timeout,
+ node=self.node)
+ logger.info("Done")
+ finally:
+ barrier.exit()
+
+ self.on_result(out_err, cmd)
+
+ def on_result(self, out_err, cmd):
+ try:
+ for data in parse_output(out_err):
+ self.on_result_cb(data)
+ except Exception as exc:
+ msg_templ = "Error during postprocessing results: {0!r}"
+ raise RuntimeError(msg_templ.format(exc.message))
+
+ def merge_results(self, results):
+ if len(results) == 0:
+ return None
+
+ merged_result = results[0]
+ merged_data = merged_result['res']
+ expected_keys = set(merged_data.keys())
+ mergable_fields = ['bw', 'clat', 'iops', 'lat', 'slat']
+
+ for res in results[1:]:
+ assert res['__meta__'] == merged_result['__meta__']
+
+ data = res['res']
+ diff = set(data.keys()).symmetric_difference(expected_keys)
+
+ msg = "Difference: {0}".format(",".join(diff))
+ assert len(diff) == 0, msg
+
+ for testname, test_data in data.items():
+ res_test_data = merged_data[testname]
+
+ diff = set(test_data.keys()).symmetric_difference(
+ res_test_data.keys())
+
+ msg = "Difference: {0}".format(",".join(diff))
+ assert len(diff) == 0, msg
+
+ for k, v in test_data.items():
+ if k in mergable_fields:
+ res_test_data[k].extend(v)
+ else:
+ msg = "{0!r} != {1!r}".format(res_test_data[k], v)
+ assert res_test_data[k] == v, msg
+
+ return merged_result
+
+ @classmethod
+ def format_for_console(cls, data):
+ return io_formatter.format_results_for_console(data)
diff --git a/wally/suits/postgres/__init__.py b/wally/suits/postgres/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/wally/suits/postgres/__init__.py
diff --git a/wally/suits/postgres/prepare.sh b/wally/suits/postgres/prepare.sh
new file mode 100755
index 0000000..e7ca3bc
--- /dev/null
+++ b/wally/suits/postgres/prepare.sh
@@ -0,0 +1,19 @@
+#!/bin/bash
+set -e
+
+if [ ! -d /etc/postgresql ]; then
+ apt-get update
+ apt-get install -y postgresql postgresql-contrib
+ err=$(pg_createcluster 9.3 main --start 2>&1 /dev/null )
+ if [ $? -ne 0 ]; then
+ echo "There was an error while creating cluster"
+ exit 1
+ fi
+fi
+
+sed -i 's/^local\s\+all\s\+all\s\+peer/local all all trust/g' /etc/postgresql/9.3/main/pg_hba.conf
+sudo sed -i "s/#listen_addresses = 'localhost'/listen_addresses = '*'/g" /etc/postgresql/9.3/main/postgresql.conf
+
+service postgresql restart
+
+exit 0
\ No newline at end of file
diff --git a/wally/suits/postgres/run.sh b/wally/suits/postgres/run.sh
new file mode 100755
index 0000000..daad499
--- /dev/null
+++ b/wally/suits/postgres/run.sh
@@ -0,0 +1,47 @@
+#!/bin/bash
+set -e
+
+while [[ $# > 1 ]]
+do
+key="$1"
+
+case $key in
+ num_clients)
+ CLIENTS="$2"
+ shift
+ ;;
+ transactions_per_client)
+ TRANSACTINOS_PER_CLIENT="$2"
+ shift
+ ;;
+ *)
+ echo "Unknown option $key"
+ exit 1
+ ;;
+esac
+shift
+done
+
+CLIENTS=$(echo $CLIENTS | tr ',' '\n')
+TRANSACTINOS_PER_CLIENT=$(echo $TRANSACTINOS_PER_CLIENT | tr ',' '\n')
+
+
+sudo -u postgres createdb -O postgres pgbench &> /dev/null
+sudo -u postgres pgbench -i -U postgres pgbench &> /dev/null
+
+
+for num_clients in $CLIENTS; do
+ for trans_per_cl in $TRANSACTINOS_PER_CLIENT; do
+ tps_all=''
+ for i in 1 2 3 4 5 6 7 8 9 10; do
+ echo -n "$num_clients $trans_per_cl:"
+ sudo -u postgres pgbench -c $num_clients -n -t $trans_per_cl -j 4 -r -U postgres pgbench |
+ grep "(excluding connections establishing)" | awk {'print $3'}
+ done
+ done
+done
+
+sudo -u postgres dropdb pgbench &> /dev/null
+
+exit 0
+
diff --git a/wally/utils.py b/wally/utils.py
new file mode 100644
index 0000000..60645d4
--- /dev/null
+++ b/wally/utils.py
@@ -0,0 +1,124 @@
+import re
+import os
+import logging
+import threading
+import contextlib
+import subprocess
+
+
+logger = logging.getLogger("wally")
+
+
+def parse_creds(creds):
+ # parse user:passwd@host
+ user, passwd_host = creds.split(":", 1)
+
+ if '@' not in passwd_host:
+ passwd, host = passwd_host, None
+ else:
+ passwd, host = passwd_host.rsplit('@', 1)
+
+ return user, passwd, host
+
+
+class TaksFinished(Exception):
+ pass
+
+
+class Barrier(object):
+ def __init__(self, count):
+ self.count = count
+ self.curr_count = 0
+ self.cond = threading.Condition()
+ self.exited = False
+
+ def wait(self, timeout=None):
+ with self.cond:
+ if self.exited:
+ raise TaksFinished()
+
+ self.curr_count += 1
+ if self.curr_count == self.count:
+ self.curr_count = 0
+ self.cond.notify_all()
+ return True
+ else:
+ self.cond.wait(timeout=timeout)
+ return False
+
+ def exit(self):
+ with self.cond:
+ self.exited = True
+
+
+@contextlib.contextmanager
+def log_error(action, types=(Exception,)):
+ if not action.startswith("!"):
+ logger.debug("Starts : " + action)
+ else:
+ action = action[1:]
+
+ try:
+ yield
+ except Exception as exc:
+ if isinstance(exc, types) and not isinstance(exc, StopIteration):
+ templ = "Error during {0} stage: {1}"
+ logger.debug(templ.format(action, exc.message))
+ raise
+
+
+SMAP = dict(k=1024, m=1024 ** 2, g=1024 ** 3, t=1024 ** 4)
+
+
+def ssize_to_b(ssize):
+ try:
+ ssize = ssize.lower()
+
+ if ssize.endswith("b"):
+ ssize = ssize[:-1]
+ if ssize[-1] in SMAP:
+ return int(ssize[:-1]) * SMAP[ssize[-1]]
+ return int(ssize)
+ except (ValueError, TypeError, AttributeError):
+ raise ValueError("Unknow size format {0!r}".format(ssize))
+
+
+def get_ip_for_target(target_ip):
+ cmd = 'ip route get to'.split(" ") + [target_ip]
+ data = subprocess.Popen(cmd, stdout=subprocess.PIPE).stdout.read()
+
+ rr1 = r'{0} via [.0-9]+ dev (?P<dev>.*?) src (?P<ip>[.0-9]+)$'
+ rr1 = rr1.replace(" ", r'\s+')
+ rr1 = rr1.format(target_ip.replace('.', r'\.'))
+
+ rr2 = r'{0} dev (?P<dev>.*?) src (?P<ip>[.0-9]+)$'
+ rr2 = rr2.replace(" ", r'\s+')
+ rr2 = rr2.format(target_ip.replace('.', r'\.'))
+
+ data_line = data.split("\n")[0].strip()
+ res1 = re.match(rr1, data_line)
+ res2 = re.match(rr2, data_line)
+
+ if res1 is not None:
+ return res1.group('ip')
+
+ if res2 is not None:
+ return res2.group('ip')
+
+ raise OSError("Can't define interface for {0}".format(target_ip))
+
+
+def open_for_append_or_create(fname):
+ if not os.path.exists(fname):
+ return open(fname, "w")
+
+ fd = open(fname, 'r+')
+ fd.seek(0, os.SEEK_END)
+ return fd
+
+
+def sec_to_str(seconds):
+ h = seconds // 3600
+ m = (seconds % 3600) // 60
+ s = seconds % 60
+ return "{0}:{1:02d}:{2:02d}".format(h, m, s)