very large refactoring
diff --git a/wally/discover/__init__.py b/wally/discover/__init__.py
new file mode 100644
index 0000000..b02809c
--- /dev/null
+++ b/wally/discover/__init__.py
@@ -0,0 +1,5 @@
+"this package contains node discovery code"
+from .discover import discover, undiscover
+from .node import Node
+
+__all__ = ["discover", "Node", "undiscover"]
diff --git a/wally/discover/ceph.py b/wally/discover/ceph.py
new file mode 100644
index 0000000..70a6edf
--- /dev/null
+++ b/wally/discover/ceph.py
@@ -0,0 +1,89 @@
+""" Collect data about ceph nodes"""
+import json
+import logging
+import subprocess
+
+
+from .node import Node
+from wally.ssh_utils import connect
+
+
+logger = logging.getLogger("io-perf-tool")
+
+
+def local_execute(cmd):
+ return subprocess.check_output(cmd, stderr=subprocess.STDOUT, shell=True)
+
+
+def ssh_execute(ssh):
+ def closure(cmd):
+ _, chan, _ = ssh.exec_command(cmd)
+ return chan.read()
+ return closure
+
+
+def discover_ceph_nodes(ip):
+ """ Return list of ceph's nodes ips """
+ ips = {}
+
+ if ip != 'local':
+ executor = ssh_execute(connect(ip))
+ else:
+ executor = local_execute
+
+ osd_ips = get_osds_ips(executor, get_osds_list(executor))
+ mon_ips = get_mons_or_mds_ips(executor, "mon")
+ mds_ips = get_mons_or_mds_ips(executor, "mds")
+
+ for ip in osd_ips:
+ url = "ssh://%s" % ip
+ ips.setdefault(url, []).append("ceph-osd")
+
+ for ip in mon_ips:
+ url = "ssh://%s" % ip
+ ips.setdefault(url, []).append("ceph-mon")
+
+ for ip in mds_ips:
+ url = "ssh://%s" % ip
+ ips.setdefault(url, []).append("ceph-mds")
+
+ return [Node(url, list(roles)) for url, roles in ips.items()]
+
+
+def get_osds_list(executor):
+ """ Get list of osds id"""
+ return filter(None, executor("ceph osd ls").split("\n"))
+
+
+def get_mons_or_mds_ips(executor, who):
+ """ Return mon ip list
+ :param who - "mon" or "mds" """
+ if who not in ("mon", "mds"):
+ raise ValueError(("'%s' in get_mons_or_mds_ips instead" +
+ "of mon/mds") % who)
+
+ line_res = executor("ceph {0} dump".format(who)).split("\n")
+
+ ips = set()
+ for line in line_res:
+ fields = line.split()
+
+ # what does fields[1], fields[2] means?
+ # make this code looks like:
+ # SOME_MENINGFULL_VAR1, SOME_MENINGFULL_VAR2 = line.split()[1:3]
+
+ if len(fields) > 2 and who in fields[2]:
+ ips.add(fields[1].split(":")[0])
+
+ return ips
+
+
+def get_osds_ips(executor, osd_list):
+ """ Get osd's ips
+ :param osd_list - list of osd names from osd ls command"""
+ ips = set()
+ for osd_id in osd_list:
+ out = executor("ceph osd find {0}".format(osd_id))
+ ip = json.loads(out)["ip"]
+ ips.add(str(ip.split(":")[0]))
+ return ips
diff --git a/wally/discover/discover.py b/wally/discover/discover.py
new file mode 100644
index 0000000..8c78387
--- /dev/null
+++ b/wally/discover/discover.py
@@ -0,0 +1,64 @@
+import logging
+
+from . import ceph
+from . import fuel
+from . import openstack
+from wally.utils import parse_creds
+
+
+logger = logging.getLogger("wally.discover")
+
+
+def discover(ctx, discover, clusters_info, var_dir):
+ nodes_to_run = []
+ clean_data = None
+ for cluster in discover:
+ if cluster == "openstack":
+ cluster_info = clusters_info["openstack"]
+ conn = cluster_info['connection']
+ user, passwd, tenant = parse_creds(conn['creds'])
+
+ auth_data = dict(
+ auth_url=conn['auth_url'],
+ username=user,
+ api_key=passwd,
+ project_id=tenant)
+
+ if not conn:
+ logger.error("No connection provided for %s. Skipping"
+ % cluster)
+ continue
+
+ logger.debug("Discovering openstack nodes "
+ "with connection details: %r" %
+ conn)
+
+ os_nodes = openstack.discover_openstack_nodes(auth_data,
+ cluster_info)
+ nodes_to_run.extend(os_nodes)
+
+ elif cluster == "fuel":
+
+ res = fuel.discover_fuel_nodes(clusters_info['fuel'], var_dir)
+ nodes, clean_data, openrc_dict = res
+
+ ctx.fuel_openstack_creds = {'name': openrc_dict['username'],
+ 'passwd': openrc_dict['password'],
+ 'tenant': openrc_dict['tenant_name'],
+ 'auth_url': openrc_dict['os_auth_url']}
+
+ nodes_to_run.extend(nodes)
+
+ elif cluster == "ceph":
+ cluster_info = clusters_info["ceph"]
+ nodes_to_run.extend(ceph.discover_ceph_nodes(cluster_info))
+ else:
+ msg_templ = "Unknown cluster type in 'discover' parameter: {0!r}"
+ raise ValueError(msg_templ.format(cluster))
+
+ return nodes_to_run, clean_data
+
+
+def undiscover(clean_data):
+ if clean_data is not None:
+ fuel.clean_fuel_port_forwarding(clean_data)
diff --git a/wally/discover/fuel.py b/wally/discover/fuel.py
new file mode 100644
index 0000000..6e76188
--- /dev/null
+++ b/wally/discover/fuel.py
@@ -0,0 +1,136 @@
+import os
+import re
+import sys
+import socket
+import logging
+from urlparse import urlparse
+
+import yaml
+from wally.fuel_rest_api import (KeystoneAuth, get_cluster_id,
+ reflect_cluster, FuelInfo)
+from wally.utils import parse_creds
+from wally.ssh_utils import run_over_ssh, connect
+
+from .node import Node
+
+
+logger = logging.getLogger("wally.discover")
+BASE_PF_PORT = 33467
+
+
+def discover_fuel_nodes(fuel_data, var_dir):
+ username, tenant_name, password = parse_creds(fuel_data['creds'])
+ creds = {"username": username,
+ "tenant_name": tenant_name,
+ "password": password}
+
+ conn = KeystoneAuth(fuel_data['url'], creds, headers=None)
+
+ cluster_id = get_cluster_id(conn, fuel_data['openstack_env'])
+ cluster = reflect_cluster(conn, cluster_id)
+ version = FuelInfo(conn).get_version()
+
+ fuel_nodes = list(cluster.get_nodes())
+
+ logger.debug("Found FUEL {0}".format("".join(map(str, version))))
+
+ network = 'fuelweb_admin' if version >= [6, 0] else 'admin'
+
+ ssh_creds = fuel_data['ssh_creds']
+
+ fuel_host = urlparse(fuel_data['url']).hostname
+ fuel_ip = socket.gethostbyname(fuel_host)
+ ssh_conn = connect("{0}@@{1}".format(ssh_creds, fuel_host))
+
+ fuel_ext_iface = get_external_interface(ssh_conn, fuel_ip)
+
+ # TODO: keep ssh key in memory
+ # http://stackoverflow.com/questions/11994139/how-to-include-the-private-key-in-paramiko-after-fetching-from-string
+ fuel_key_file = os.path.join(var_dir, "fuel_master_node_id_rsa")
+ download_master_key(ssh_conn, fuel_key_file)
+
+ nodes = []
+ ports = range(BASE_PF_PORT, BASE_PF_PORT + len(fuel_nodes))
+ ips_ports = []
+
+ for fuel_node, port in zip(fuel_nodes, ports):
+ ip = fuel_node.get_ip(network)
+ forward_ssh_port(ssh_conn, fuel_ext_iface, port, ip)
+
+ conn_url = "ssh://root@{0}:{1}:{2}".format(fuel_host,
+ port,
+ fuel_key_file)
+ nodes.append(Node(conn_url, fuel_node['roles']))
+ ips_ports.append((ip, port))
+
+ logger.debug("Found %s fuel nodes for env %r" %
+ (len(nodes), fuel_data['openstack_env']))
+
+ return ([],
+ (ssh_conn, fuel_ext_iface, ips_ports),
+ cluster.get_openrc())
+
+ return (nodes,
+ (ssh_conn, fuel_ext_iface, ips_ports),
+ cluster.get_openrc())
+
+
+def download_master_key(conn, dest):
+ # download master key
+ sftp = conn.open_sftp()
+ sftp.get('/root/.ssh/id_rsa', dest)
+ os.chmod(dest, 0o400)
+ sftp.close()
+
+ logger.debug("Fuel master key stored in {0}".format(dest))
+
+
+def get_external_interface(conn, ip):
+ data = run_over_ssh(conn, "ip a", node='fuel-master')
+ curr_iface = None
+ for line in data.split("\n"):
+
+ match1 = re.match(r"\d+:\s+(?P<name>.*?):\s\<", line)
+ if match1 is not None:
+ curr_iface = match1.group('name')
+
+ match2 = re.match(r"\s+inet\s+(?P<ip>[0-9.]+)/", line)
+ if match2 is not None:
+ if match2.group('ip') == ip:
+ assert curr_iface is not None
+ return curr_iface
+ raise KeyError("Can't found interface for ip {0}".format(ip))
+
+
+def forward_ssh_port(conn, iface, new_port, ip, clean=False):
+ mode = "-D" if clean is True else "-A"
+ cmd = "iptables -t nat {mode} PREROUTING -p tcp " + \
+ "-i {iface} --dport {port} -j DNAT --to {ip}:22"
+ run_over_ssh(conn,
+ cmd.format(iface=iface, port=new_port, ip=ip, mode=mode),
+ node='fuel-master')
+
+
+def clean_fuel_port_forwarding(clean_data):
+ conn, iface, ips_ports = clean_data
+ for ip, port in ips_ports:
+ forward_ssh_port(conn, iface, port, ip, clean=True)
+
+
+def main(argv):
+ fuel_data = yaml.load(open(sys.argv[1]).read())['clouds']['fuel']
+ nodes, to_clean, openrc = discover_fuel_nodes(fuel_data, '/tmp')
+
+ print nodes
+ print openrc
+ print "Ready to test"
+
+ sys.stdin.readline()
+
+ clean_fuel_port_forwarding(to_clean)
+
+ return 0
+
+
+if __name__ == "__main__":
+ main(sys.argv[1:])
diff --git a/wally/discover/node.py b/wally/discover/node.py
new file mode 100644
index 0000000..fad4f29
--- /dev/null
+++ b/wally/discover/node.py
@@ -0,0 +1,22 @@
+import urlparse
+
+
+class Node(object):
+
+ def __init__(self, conn_url, roles):
+ self.roles = roles
+ self.conn_url = conn_url
+ self.connection = None
+
+ def get_ip(self):
+ return urlparse.urlparse(self.conn_url).hostname
+
+ def __str__(self):
+ templ = "<Node: url={conn_url!r} roles={roles}" + \
+ " connected={is_connected}>"
+ return templ.format(conn_url=self.conn_url,
+ roles=", ".join(self.roles),
+ is_connected=self.connection is not None)
+
+ def __repr__(self):
+ return str(self)
diff --git a/wally/discover/openstack.py b/wally/discover/openstack.py
new file mode 100644
index 0000000..32b4629
--- /dev/null
+++ b/wally/discover/openstack.py
@@ -0,0 +1,70 @@
+import socket
+import logging
+
+
+from novaclient.client import Client
+
+from .node import Node
+from wally.utils import parse_creds
+
+
+logger = logging.getLogger("io-perf-tool.discover")
+
+
+def get_floating_ip(vm):
+ addrs = vm.addresses
+ for net_name, ifaces in addrs.items():
+ for iface in ifaces:
+ if iface.get('OS-EXT-IPS:type') == "floating":
+ return iface['addr']
+
+
+def discover_vms(client, search_opts):
+ user, password, key = parse_creds(search_opts.pop('auth'))
+
+ servers = client.servers.list(search_opts=search_opts)
+ logger.debug("Found %s openstack vms" % len(servers))
+ return [Node(get_floating_ip(server), ["test_vm"], username=user,
+ password=password, key_path=key)
+ for server in servers if get_floating_ip(server)]
+
+
+def discover_services(client, opts):
+ user, password, key = parse_creds(opts.pop('auth'))
+
+ services = []
+ if opts['service'] == "all":
+ services = client.services.list()
+ else:
+ if isinstance(opts['service'], basestring):
+ opts['service'] = [opts['service']]
+
+ for s in opts['service']:
+ services.extend(client.services.list(binary=s))
+
+ host_services_mapping = {}
+
+ for service in services:
+ ip = socket.gethostbyname(service.host)
+ host_services_mapping[ip].append(service.binary)
+
+ logger.debug("Found %s openstack service nodes" %
+ len(host_services_mapping))
+ return [Node(host, services, username=user,
+ password=password, key_path=key) for host, services in
+ host_services_mapping.items()]
+
+
+def discover_openstack_nodes(conn_details, conf):
+ """Discover vms running in openstack
+ :param conn_details - dict with openstack connection details -
+ auth_url, api_key (password), username
+ """
+ client = Client(version='1.1', **conn_details)
+ nodes = []
+ if conf.get('discover'):
+ services_to_discover = conf['discover'].get('nodes')
+ if services_to_discover:
+ nodes.extend(discover_services(client, services_to_discover))
+
+ return nodes