2.0 refactoring:
* Add type for most of functions
* Remove old fio run code, move to RPC/pluggable
* Remove most of sensors code, will move then to RPC
* Other refactoring
diff --git a/wally/discover/ceph.py b/wally/discover/ceph.py
index 70a6edf..ae06cbc 100644
--- a/wally/discover/ceph.py
+++ b/wally/discover/ceph.py
@@ -1,39 +1,22 @@
""" Collect data about ceph nodes"""
import json
import logging
-import subprocess
+from typing import Iterable
-from .node import Node
-from wally.ssh_utils import connect
+from ..node import NodeInfo, Node
-logger = logging.getLogger("io-perf-tool")
+logger = logging.getLogger("wally.discover")
-def local_execute(cmd):
- return subprocess.check_output(cmd, stderr=subprocess.STDOUT, shell=True)
-
-
-def ssh_execute(ssh):
- def closure(cmd):
- _, chan, _ = ssh.exec_command(cmd)
- return chan.read()
- return closure
-
-
-def discover_ceph_nodes(ip):
- """ Return list of ceph's nodes ips """
+def discover_ceph_nodes(node: Node) -> Iterable[NodeInfo]:
+ """Return list of ceph's nodes NodeInfo"""
ips = {}
- if ip != 'local':
- executor = ssh_execute(connect(ip))
- else:
- executor = local_execute
-
- osd_ips = get_osds_ips(executor, get_osds_list(executor))
- mon_ips = get_mons_or_mds_ips(executor, "mon")
- mds_ips = get_mons_or_mds_ips(executor, "mds")
+ osd_ips = get_osds_ips(node, get_osds_list(node))
+ mon_ips = get_mons_or_mds_ips(node, "mon")
+ mds_ips = get_mons_or_mds_ips(node, "mds")
for ip in osd_ips:
url = "ssh://%s" % ip
@@ -47,43 +30,35 @@
url = "ssh://%s" % ip
ips.setdefault(url, []).append("ceph-mds")
- return [Node(url, list(roles)) for url, roles in ips.items()]
+ return [NodeInfo(url, set(roles)) for url, roles in ips.items()]
-def get_osds_list(executor):
- """ Get list of osds id"""
- return filter(None, executor("ceph osd ls").split("\n"))
+def get_osds_list(node: Node) -> Iterable[str]:
+ """Get list of osd's id"""
+ return filter(None, node.run("ceph osd ls").split("\n"))
-def get_mons_or_mds_ips(executor, who):
- """ Return mon ip list
- :param who - "mon" or "mds" """
- if who not in ("mon", "mds"):
- raise ValueError(("'%s' in get_mons_or_mds_ips instead" +
- "of mon/mds") % who)
+def get_mons_or_mds_ips(node: Node, who: str) -> Iterable[str]:
+ """Return mon ip list. who - mon/mds"""
+ assert who in ("mon", "mds"), \
+ "{!r} in get_mons_or_mds_ips instead of mon/mds".format(who)
- line_res = executor("ceph {0} dump".format(who)).split("\n")
+ line_res = node.run("ceph {0} dump".format(who)).split("\n")
ips = set()
for line in line_res:
fields = line.split()
-
- # what does fields[1], fields[2] means?
- # make this code looks like:
- # SOME_MENINGFULL_VAR1, SOME_MENINGFULL_VAR2 = line.split()[1:3]
-
if len(fields) > 2 and who in fields[2]:
ips.add(fields[1].split(":")[0])
return ips
-def get_osds_ips(executor, osd_list):
- """ Get osd's ips
- :param osd_list - list of osd names from osd ls command"""
+def get_osds_ips(node: Node, osd_list: Iterable[str]) -> Iterable[str]:
+ """Get osd's ips. osd_list - list of osd names from osd ls command"""
ips = set()
for osd_id in osd_list:
- out = executor("ceph osd find {0}".format(osd_id))
+ out = node.run("ceph osd find {0}".format(osd_id))
ip = json.loads(out)["ip"]
ips.add(str(ip.split(":")[0]))
return ips
diff --git a/wally/discover/discover.py b/wally/discover/discover.py
index 5b9d3fe..233650e 100644
--- a/wally/discover/discover.py
+++ b/wally/discover/discover.py
@@ -1,10 +1,14 @@
import os.path
import logging
+from paramiko import AuthenticationException
+
from . import ceph
from . import fuel
from . import openstack
-from wally.utils import parse_creds
+from ..utils import parse_creds, StopTestError
+from ..test_run_class import TestRun
+from ..node import Node, NodeInfo
logger = logging.getLogger("wally.discover")
@@ -28,12 +32,12 @@
"""
-def discover(ctx, discover, clusters_info, var_dir, discover_nodes=True):
+def discover(testrun: TestRun, discover_cfg, clusters_info, var_dir, discover_nodes=True):
+ """Discover nodes in clusters"""
nodes_to_run = []
clean_data = None
- ctx.fuel_openstack_creds = None
- for cluster in discover:
+ for cluster in discover_cfg:
if cluster == "openstack" and not discover_nodes:
logger.warning("Skip openstack cluster discovery")
elif cluster == "openstack" and discover_nodes:
@@ -64,19 +68,30 @@
if cluster == "fuel_openrc_only":
discover_nodes = False
- res = fuel.discover_fuel_nodes(clusters_info['fuel'],
- var_dir,
+ ssh_creds = clusters_info['fuel']['ssh_creds']
+ fuel_node = Node(NodeInfo(ssh_creds, {'fuel_master'}))
+
+ try:
+ fuel_node.connect_ssh()
+ except AuthenticationException:
+ raise StopTestError("Wrong fuel credentials")
+ except Exception:
+ logger.exception("While connection to FUEL")
+ raise StopTestError("Failed to connect to FUEL")
+
+ fuel_node.connect_rpc()
+
+ res = fuel.discover_fuel_nodes(fuel_node,
+ clusters_info['fuel'],
discover_nodes)
nodes, clean_data, openrc_dict, version = res
- if openrc_dict is None:
- ctx.fuel_openstack_creds = None
- else:
+ if openrc_dict:
if version >= [8, 0] and openrc_dict['os_auth_url'].startswith("https://"):
logger.warning("Fixing FUEL 8.0 AUTH url - replace https://->http://")
openrc_dict['os_auth_url'] = "http" + openrc_dict['os_auth_url'][5:]
- ctx.fuel_openstack_creds = {
+ testrun.fuel_openstack_creds = {
'name': openrc_dict['username'],
'passwd': openrc_dict['password'],
'tenant': openrc_dict['tenant_name'],
@@ -91,11 +106,11 @@
fuel_openrc_fname = os.path.join(var_dir,
env_f_name + "_openrc")
- if ctx.fuel_openstack_creds is not None:
+ if testrun.fuel_openstack_creds is not None:
with open(fuel_openrc_fname, "w") as fd:
- fd.write(openrc_templ.format(**ctx.fuel_openstack_creds))
- msg = "Openrc for cluster {0} saves into {1}"
- logger.info(msg.format(env_name, fuel_openrc_fname))
+ fd.write(openrc_templ.format(**testrun.fuel_openstack_creds))
+ msg = "Openrc for cluster {0} saves into {1}"
+ logger.info(msg.format(env_name, fuel_openrc_fname))
nodes_to_run.extend(nodes)
elif cluster == "ceph":
diff --git a/wally/discover/fuel.py b/wally/discover/fuel.py
index 4f34298..1443a9f 100644
--- a/wally/discover/fuel.py
+++ b/wally/discover/fuel.py
@@ -1,44 +1,41 @@
-import re
import socket
import logging
-from urlparse import urlparse
-
-import sshtunnel
-from paramiko import AuthenticationException
+from typing import Dict, Any, Tuple, List
+from urllib.parse import urlparse
-from wally.fuel_rest_api import (KeystoneAuth, get_cluster_id,
- reflect_cluster, FuelInfo)
-from wally.utils import (parse_creds, check_input_param, StopTestError,
- clean_resource, get_ip_for_target)
-from wally.ssh_utils import (run_over_ssh, connect, set_key_for_node,
- read_from_remote)
-
-from .node import Node
+from .. import fuel_rest_api
+from ..utils import parse_creds, check_input_param
+from ..node import NodeInfo, Node, FuelNodeInfo
logger = logging.getLogger("wally.discover")
-BASE_PF_PORT = 44006
-def discover_fuel_nodes(fuel_data, var_dir, discover_nodes=True):
+def discover_fuel_nodes(fuel_master_node: Node,
+ fuel_data: Dict[str, Any],
+ discover_nodes: bool=True) -> Tuple[List[NodeInfo], FuelNodeInfo]:
+ """Discover nodes in fuel cluster, get openrc for selected cluster"""
+
+ # parse FUEL REST credentials
username, tenant_name, password = parse_creds(fuel_data['creds'])
creds = {"username": username,
"tenant_name": tenant_name,
"password": password}
- conn = KeystoneAuth(fuel_data['url'], creds, headers=None)
-
+ # connect to FUEL
+ conn = fuel_rest_api.KeystoneAuth(fuel_data['url'], creds, headers=None)
msg = "openstack_env should be provided in fuel config"
check_input_param('openstack_env' in fuel_data, msg)
- cluster_id = get_cluster_id(conn, fuel_data['openstack_env'])
- cluster = reflect_cluster(conn, cluster_id)
- version = FuelInfo(conn).get_version()
+ # get cluster information from REST API
+ cluster_id = fuel_rest_api.get_cluster_id(conn, fuel_data['openstack_env'])
+ cluster = fuel_rest_api.reflect_cluster(conn, cluster_id)
+ version = fuel_rest_api.FuelInfo(conn).get_version()
if not discover_nodes:
logger.warning("Skip fuel cluster discovery")
- return ([], None, cluster.get_openrc(), version)
+ return [], FuelNodeInfo(version, None, cluster.get_openrc())
fuel_nodes = list(cluster.get_nodes())
@@ -46,85 +43,27 @@
network = 'fuelweb_admin' if version >= [6, 0] else 'admin'
- ssh_creds = fuel_data['ssh_creds']
-
fuel_host = urlparse(fuel_data['url']).hostname
fuel_ip = socket.gethostbyname(fuel_host)
+ fuel_ext_iface = fuel_master_node.get_interface(fuel_ip)
- try:
- ssh_conn = connect("{0}@{1}".format(ssh_creds, fuel_host))
- except AuthenticationException:
- raise StopTestError("Wrong fuel credentials")
- except Exception:
- logger.exception("While connection to FUEL")
- raise StopTestError("Failed to connect to FUEL")
-
- fuel_ext_iface = get_external_interface(ssh_conn, fuel_ip)
-
+ # get FUEL master key to connect to cluster nodes via ssh
logger.debug("Downloading fuel master key")
- fuel_key = download_master_key(ssh_conn)
+ fuel_key = fuel_master_node.get_file_content('/root/.ssh/id_rsa')
+
+ # forward ports of cluster nodes to FUEL master
+ logger.info("Forwarding ssh ports from FUEL nodes to localhost")
+ ips = [str(fuel_node.get_ip(network)) for fuel_node in fuel_nodes]
+ port_fw = [fuel_master_node.forward_port(ip, 22) for ip in ips]
+ listen_ip = fuel_master_node.get_ip()
nodes = []
- ips_ports = []
-
- logger.info("Forwarding ssh ports from FUEL nodes to localhost")
- fuel_usr, fuel_passwd = ssh_creds.split(":", 1)
- ips = [str(fuel_node.get_ip(network)) for fuel_node in fuel_nodes]
- port_fw = forward_ssh_ports(fuel_host, fuel_usr, fuel_passwd, ips)
- listen_ip = get_ip_for_target(fuel_host)
-
for port, fuel_node, ip in zip(port_fw, fuel_nodes, ips):
- logger.debug(
- "SSH port forwarding {0} => localhost:{1}".format(ip, port))
+ logger.debug("SSH port forwarding {} => {}:{}".format(ip, listen_ip, port))
+ conn_url = "ssh://root@{}:{}".format(listen_ip, port)
+ nodes.append(NodeInfo(conn_url, fuel_node['roles'], listen_ip, fuel_key))
- conn_url = "ssh://root@127.0.0.1:{0}".format(port)
- set_key_for_node(('127.0.0.1', port), fuel_key)
+ logger.debug("Found {} fuel nodes for env {}".format(len(nodes), fuel_data['openstack_env']))
- node = Node(conn_url, fuel_node['roles'])
- node.monitor_ip = listen_ip
- nodes.append(node)
- ips_ports.append((ip, port))
+ return nodes, FuelNodeInfo(version, fuel_ext_iface, cluster.get_openrc())
- logger.debug("Found %s fuel nodes for env %r" %
- (len(nodes), fuel_data['openstack_env']))
-
- return (nodes,
- (ssh_conn, fuel_ext_iface, ips_ports),
- cluster.get_openrc(),
- version)
-
-
-def download_master_key(conn):
- # download master key
- with conn.open_sftp() as sftp:
- return read_from_remote(sftp, '/root/.ssh/id_rsa')
-
-
-def get_external_interface(conn, ip):
- data = run_over_ssh(conn, "ip a", node='fuel-master', nolog=True)
- curr_iface = None
- for line in data.split("\n"):
-
- match1 = re.match(r"\d+:\s+(?P<name>.*?):\s\<", line)
- if match1 is not None:
- curr_iface = match1.group('name')
-
- match2 = re.match(r"\s+inet\s+(?P<ip>[0-9.]+)/", line)
- if match2 is not None:
- if match2.group('ip') == ip:
- assert curr_iface is not None
- return curr_iface
- raise KeyError("Can't found interface for ip {0}".format(ip))
-
-
-def forward_ssh_ports(proxy_ip, proxy_user, proxy_passwd, ips):
- for ip in ips:
- tunnel = sshtunnel.open(
- (proxy_ip, 22),
- ssh_username=proxy_user,
- ssh_password=proxy_passwd,
- threaded=True,
- remote_bind_address=(ip, 22))
- tunnel.__enter__()
- clean_resource(tunnel.__exit__, None, None, None)
- yield tunnel.local_bind_port
diff --git a/wally/discover/node.py b/wally/discover/node.py
deleted file mode 100644
index a3d58f9..0000000
--- a/wally/discover/node.py
+++ /dev/null
@@ -1,46 +0,0 @@
-import getpass
-
-from wally.ssh_utils import parse_ssh_uri
-
-
-class Node(object):
-
- def __init__(self, conn_url, roles):
- self.roles = roles
- self.conn_url = conn_url
- self.connection = None
- self.monitor_ip = None
- self.os_vm_id = None
-
- def get_ip(self):
- if self.conn_url == 'local':
- return '127.0.0.1'
-
- assert self.conn_url.startswith("ssh://")
- return parse_ssh_uri(self.conn_url[6:]).host
-
- def get_conn_id(self):
- if self.conn_url == 'local':
- return '127.0.0.1'
-
- assert self.conn_url.startswith("ssh://")
- creds = parse_ssh_uri(self.conn_url[6:])
- return "{0.host}:{0.port}".format(creds)
-
- def get_user(self):
- if self.conn_url == 'local':
- return getpass.getuser()
-
- assert self.conn_url.startswith("ssh://")
- creds = parse_ssh_uri(self.conn_url[6:])
- return creds.user
-
- def __str__(self):
- templ = "<Node: url={conn_url!r} roles={roles}" + \
- " connected={is_connected}>"
- return templ.format(conn_url=self.conn_url,
- roles=", ".join(self.roles),
- is_connected=self.connection is not None)
-
- def __repr__(self):
- return str(self)
diff --git a/wally/discover/openstack.py b/wally/discover/openstack.py
index 32b4629..bb128c3 100644
--- a/wally/discover/openstack.py
+++ b/wally/discover/openstack.py
@@ -1,42 +1,63 @@
import socket
import logging
+from typing import Iterable, Dict, Any
from novaclient.client import Client
-from .node import Node
+from ..node import NodeInfo
from wally.utils import parse_creds
-logger = logging.getLogger("io-perf-tool.discover")
+logger = logging.getLogger("wally.discover")
-def get_floating_ip(vm):
- addrs = vm.addresses
- for net_name, ifaces in addrs.items():
+def get_floating_ip(vm) -> str:
+ """Get VM floating IP address"""
+
+ for net_name, ifaces in vm.addresses.items():
for iface in ifaces:
if iface.get('OS-EXT-IPS:type') == "floating":
return iface['addr']
+ raise ValueError("VM {} has no floating ip".format(vm))
-def discover_vms(client, search_opts):
+
+def get_ssh_url(user: str, password: str, ip: str, key: str) -> str:
+ """Get ssh connection URL from parts"""
+
+ if password is not None:
+ assert key is None, "Both key and password provided"
+ return "ssh://{}:{}@{}".format(user, password, ip)
+ else:
+ assert key is not None, "None of key/password provided"
+ return "ssh://{}@{}::{}".format(user, ip, key)
+
+
+def discover_vms(client: Client, search_opts) -> Iterable[NodeInfo]:
+ """Discover virtual machines"""
user, password, key = parse_creds(search_opts.pop('auth'))
servers = client.servers.list(search_opts=search_opts)
logger.debug("Found %s openstack vms" % len(servers))
- return [Node(get_floating_ip(server), ["test_vm"], username=user,
- password=password, key_path=key)
- for server in servers if get_floating_ip(server)]
+
+ nodes = []
+ for server in servers:
+ ip = get_floating_ip(server)
+ nodes.append(NodeInfo(get_ssh_url(user, password, ip, key), ["test_vm"]))
+
+ return nodes
-def discover_services(client, opts):
+def discover_services(client: Client, opts: Dict[str, Any]) -> Iterable[NodeInfo]:
+ """Discover openstack services for given cluster"""
user, password, key = parse_creds(opts.pop('auth'))
services = []
if opts['service'] == "all":
services = client.services.list()
else:
- if isinstance(opts['service'], basestring):
+ if isinstance(opts['service'], str):
opts['service'] = [opts['service']]
for s in opts['service']:
@@ -50,21 +71,25 @@
logger.debug("Found %s openstack service nodes" %
len(host_services_mapping))
- return [Node(host, services, username=user,
- password=password, key_path=key) for host, services in
- host_services_mapping.items()]
+ nodes = []
+ for host, services in host_services_mapping.items():
+ ssh_url = get_ssh_url(user, password, host, key)
+ nodes.append(NodeInfo(ssh_url, services))
+
+ return nodes
-def discover_openstack_nodes(conn_details, conf):
+def discover_openstack_nodes(conn_details: Dict[str, str], conf: Dict[str, Any]) -> Iterable[NodeInfo]:
"""Discover vms running in openstack
- :param conn_details - dict with openstack connection details -
+ conn_details - dict with openstack connection details -
auth_url, api_key (password), username
+ conf - test configuration object
"""
client = Client(version='1.1', **conn_details)
- nodes = []
+
if conf.get('discover'):
services_to_discover = conf['discover'].get('nodes')
if services_to_discover:
- nodes.extend(discover_services(client, services_to_discover))
+ return discover_services(client, services_to_discover)
- return nodes
+ return []