refactoring is on the way
diff --git a/wally/discover/ceph.py b/wally/discover/ceph.py
index ae06cbc..4a72bfb 100644
--- a/wally/discover/ceph.py
+++ b/wally/discover/ceph.py
@@ -1,64 +1,91 @@
""" Collect data about ceph nodes"""
import json
import logging
-from typing import Iterable
+from typing import List, Set, Dict
-from ..node import NodeInfo, Node
-
+from ..node_interfaces import NodeInfo, IRPCNode
+from ..ssh_utils import ConnCreds
+from ..common_types import IP
logger = logging.getLogger("wally.discover")
-def discover_ceph_nodes(node: Node) -> Iterable[NodeInfo]:
+def discover_ceph_nodes(node: IRPCNode,
+ cluster: str = "ceph",
+ conf: str = None,
+ key: str = None) -> List[NodeInfo]:
"""Return list of ceph's nodes NodeInfo"""
- ips = {}
- osd_ips = get_osds_ips(node, get_osds_list(node))
- mon_ips = get_mons_or_mds_ips(node, "mon")
- mds_ips = get_mons_or_mds_ips(node, "mds")
+ if conf is None:
+ conf = "/etc/ceph/{}.conf".format(cluster)
+ if key is None:
+ key = "/etc/ceph/{}.client.admin.keyring".format(cluster)
+
+ try:
+ osd_ips = get_osds_ips(node, conf, key)
+ except Exception as exc:
+ logger.error("OSD discovery failed: %s", exc)
+ osd_ips = set()
+
+ try:
+ mon_ips = get_mons_ips(node, conf, key)
+ except Exception as exc:
+ logger.error("MON discovery failed: %s", exc)
+ mon_ips = set()
+
+ ips = {} # type: Dict[str, List[str]]
for ip in osd_ips:
- url = "ssh://%s" % ip
- ips.setdefault(url, []).append("ceph-osd")
+ ips.setdefault(ip, []).append("ceph-osd")
for ip in mon_ips:
- url = "ssh://%s" % ip
- ips.setdefault(url, []).append("ceph-mon")
+ ips.setdefault(ip, []).append("ceph-mon")
- for ip in mds_ips:
- url = "ssh://%s" % ip
- ips.setdefault(url, []).append("ceph-mds")
-
- return [NodeInfo(url, set(roles)) for url, roles in ips.items()]
+ ssh_key = node.get_file_content("~/.ssh/id_rsa")
+ return [NodeInfo(ConnCreds(host=ip, user="root", key=ssh_key), set(roles)) for ip, roles in ips.items()]
-def get_osds_list(node: Node) -> Iterable[str]:
- """Get list of osd's id"""
- return filter(None, node.run("ceph osd ls").split("\n"))
+def get_osds_ips(node: IRPCNode, conf: str, key: str) -> Set[IP]:
+ """Get set of osd's ip"""
-
-def get_mons_or_mds_ips(node: Node, who: str) -> Iterable[str]:
- """Return mon ip list. who - mon/mds"""
- assert who in ("mon", "mds"), \
- "{!r} in get_mons_or_mds_ips instead of mon/mds".format(who)
-
- line_res = node.run("ceph {0} dump".format(who)).split("\n")
-
- ips = set()
- for line in line_res:
- fields = line.split()
- if len(fields) > 2 and who in fields[2]:
- ips.add(fields[1].split(":")[0])
-
+ data = node.run("ceph -c {} -k {} --format json osd dump".format(conf, key))
+ jdata = json.loads(data)
+ ips = set() # type: Set[IP]
+ first_error = True
+ for osd_data in jdata["osds"]:
+ if "public_addr" not in osd_data:
+ if first_error:
+ osd_id = osd_data.get("osd", "<OSD_ID_MISSED>")
+ logger.warning("No 'public_addr' field in 'ceph osd dump' output for osd %s" +
+ "(all subsequent errors omitted)", osd_id)
+ first_error = False
+ else:
+ ip_port = osd_data["public_addr"]
+ if '/' in ip_port:
+ ip_port = ip_port.split("/", 1)[0]
+ ips.add(IP(ip_port.split(":")[0]))
return ips
-def get_osds_ips(node: Node, osd_list: Iterable[str]) -> Iterable[str]:
- """Get osd's ips. osd_list - list of osd names from osd ls command"""
- ips = set()
- for osd_id in osd_list:
- out = node.run("ceph osd find {0}".format(osd_id))
- ip = json.loads(out)["ip"]
- ips.add(str(ip.split(":")[0]))
+def get_mons_ips(node: IRPCNode, conf: str, key: str) -> Set[IP]:
+ """Return mon ip set"""
+
+ data = node.run("ceph -c {} -k {} --format json mon_status".format(conf, key))
+ jdata = json.loads(data)
+ ips = set() # type: Set[IP]
+ first_error = True
+ for mon_data in jdata["monmap"]["mons"]:
+ if "addr" not in mon_data:
+ if first_error:
+ mon_name = mon_data.get("name", "<MON_NAME_MISSED>")
+ logger.warning("No 'addr' field in 'ceph mon_status' output for mon %s" +
+ "(all subsequent errors omitted)", mon_name)
+ first_error = False
+ else:
+ ip_port = mon_data["addr"]
+ if '/' in ip_port:
+ ip_port = ip_port.split("/", 1)[0]
+ ips.add(IP(ip_port.split(":")[0]))
+
return ips
diff --git a/wally/discover/discover.py b/wally/discover/discover.py
index d1eb9ac..e0d7dab 100644
--- a/wally/discover/discover.py
+++ b/wally/discover/discover.py
@@ -1,14 +1,18 @@
import os.path
import logging
+from typing import Dict, NamedTuple, List, Optional, cast
-from paramiko import AuthenticationException
+from paramiko.ssh_exception import AuthenticationException
from . import ceph
from . import fuel
from . import openstack
from ..utils import parse_creds, StopTestError
-from ..test_run_class import TestRun
-from ..node import Node
+from ..config import ConfigBlock
+from ..start_vms import OSCreds
+from ..node_interfaces import NodeInfo
+from ..node import connect, setup_rpc
+from ..ssh_utils import parse_ssh_uri
logger = logging.getLogger("wally.discover")
@@ -32,95 +36,81 @@
"""
-def discover(testrun: TestRun, discover_cfg, clusters_info, var_dir, discover_nodes=True):
+DiscoveryResult = NamedTuple("DiscoveryResult", [("os_creds", Optional[OSCreds]), ("nodes", List[NodeInfo])])
+
+
+def discover(discover_list: List[str], clusters_info: ConfigBlock, discover_nodes: bool = True) -> DiscoveryResult:
"""Discover nodes in clusters"""
- nodes_to_run = []
- clean_data = None
- for cluster in discover_cfg:
- if cluster == "openstack" and not discover_nodes:
- logger.warning("Skip openstack cluster discovery")
- elif cluster == "openstack" and discover_nodes:
- cluster_info = clusters_info["openstack"]
- conn = cluster_info['connection']
- user, passwd, tenant = parse_creds(conn['creds'])
+ new_nodes = [] # type: List[NodeInfo]
+ os_creds = None # type: Optional[OSCreds]
- auth_data = dict(
- auth_url=conn['auth_url'],
- username=user,
- api_key=passwd,
- project_id=tenant)
-
- if not conn:
- logger.error("No connection provided for %s. Skipping"
- % cluster)
+ for cluster in discover_list:
+ if cluster == "openstack":
+ if not discover_nodes:
+ logger.warning("Skip openstack cluster discovery")
continue
- logger.debug("Discovering openstack nodes "
- "with connection details: %r" %
- conn)
+ cluster_info = clusters_info["openstack"] # type: ConfigBlock
- os_nodes = openstack.discover_openstack_nodes(auth_data,
- cluster_info)
- nodes_to_run.extend(os_nodes)
+ conn = cluster_info['connection'] # type: ConfigBlock
+ if not conn:
+ logger.error("No connection provided for %s. Skipping", cluster)
+ continue
+
+ user, passwd, tenant = parse_creds(conn['creds'])
+
+ auth_data = dict(auth_url=conn['auth_url'],
+ username=user,
+ api_key=passwd,
+ project_id=tenant) # type: Dict[str, str]
+
+ logger.debug("Discovering openstack nodes with connection details: %r", conn)
+ new_nodes.extend(openstack.discover_openstack_nodes(auth_data, cluster_info))
elif cluster == "fuel" or cluster == "fuel_openrc_only":
if cluster == "fuel_openrc_only":
discover_nodes = False
- ssh_creds = clusters_info['fuel']['ssh_creds']
- fuel_node = Node(NodeInfo(ssh_creds, {'fuel_master'}))
-
+ fuel_node_info = NodeInfo(parse_ssh_uri(clusters_info['fuel']['ssh_creds']), {'fuel_master'})
try:
- fuel_node.connect_ssh()
+ fuel_rpc_conn = setup_rpc(connect(fuel_node_info))
except AuthenticationException:
raise StopTestError("Wrong fuel credentials")
except Exception:
logger.exception("While connection to FUEL")
raise StopTestError("Failed to connect to FUEL")
- fuel_node.connect_rpc()
+ with fuel_rpc_conn:
+ nodes, fuel_info = fuel.discover_fuel_nodes(fuel_rpc_conn, clusters_info['fuel'], discover_nodes)
+ new_nodes.extend(nodes)
- res = fuel.discover_fuel_nodes(fuel_node,
- clusters_info['fuel'],
- discover_nodes)
- nodes, clean_data, openrc_dict, version = res
+ if fuel_info.openrc:
+ auth_url = cast(str, fuel_info.openrc['os_auth_url'])
+ if fuel_info.version >= [8, 0] and auth_url.startswith("https://"):
+ logger.warning("Fixing FUEL 8.0 AUTH url - replace https://->http://")
+ auth_url = auth_url.replace("https", "http", 1)
- if openrc_dict:
- if version >= [8, 0] and openrc_dict['os_auth_url'].startswith("https://"):
- logger.warning("Fixing FUEL 8.0 AUTH url - replace https://->http://")
- openrc_dict['os_auth_url'] = "http" + openrc_dict['os_auth_url'][5:]
-
- testrun.fuel_openstack_creds = {
- 'name': openrc_dict['username'],
- 'passwd': openrc_dict['password'],
- 'tenant': openrc_dict['tenant_name'],
- 'auth_url': openrc_dict['os_auth_url'],
- 'insecure': openrc_dict['insecure']}
-
- env_name = clusters_info['fuel']['openstack_env']
- env_f_name = env_name
- for char in "-+ {}()[]":
- env_f_name = env_f_name.replace(char, '_')
-
- fuel_openrc_fname = os.path.join(var_dir,
- env_f_name + "_openrc")
-
- if testrun.fuel_openstack_creds is not None:
- with open(fuel_openrc_fname, "w") as fd:
- fd.write(openrc_templ.format(**testrun.fuel_openstack_creds))
- msg = "Openrc for cluster {0} saves into {1}"
- logger.info(msg.format(env_name, fuel_openrc_fname))
- nodes_to_run.extend(nodes)
+ os_creds = OSCreds(name=cast(str, fuel_info.openrc['username']),
+ passwd=cast(str, fuel_info.openrc['password']),
+ tenant=cast(str, fuel_info.openrc['tenant_name']),
+ auth_url=cast(str, auth_url),
+ insecure=cast(bool, fuel_info.openrc['insecure']))
elif cluster == "ceph":
if discover_nodes:
cluster_info = clusters_info["ceph"]
- nodes_to_run.extend(ceph.discover_ceph_nodes(cluster_info))
+ root_node_uri = cast(str, cluster_info["root_node"])
+ cluster = clusters_info["ceph"].get("cluster", "ceph")
+ conf = clusters_info["ceph"].get("conf")
+ key = clusters_info["ceph"].get("key")
+ info = NodeInfo(parse_ssh_uri(root_node_uri), set())
+ with setup_rpc(connect(info)) as ceph_root_conn:
+ new_nodes.extend(ceph.discover_ceph_nodes(ceph_root_conn, cluster=cluster, conf=conf, key=key))
else:
logger.warning("Skip ceph cluster discovery")
else:
- msg_templ = "Unknown cluster type in 'discover' parameter: {0!r}"
+ msg_templ = "Unknown cluster type in 'discover' parameter: {!r}"
raise ValueError(msg_templ.format(cluster))
- return nodes_to_run
+ return DiscoveryResult(os_creds, new_nodes)
diff --git a/wally/discover/fuel.py b/wally/discover/fuel.py
index 1443a9f..119cbd0 100644
--- a/wally/discover/fuel.py
+++ b/wally/discover/fuel.py
@@ -1,20 +1,25 @@
-import socket
import logging
-from typing import Dict, Any, Tuple, List
+import socket
+from typing import Dict, Any, Tuple, List, NamedTuple, Union
from urllib.parse import urlparse
-
from .. import fuel_rest_api
+from ..node_interfaces import NodeInfo, IRPCNode
+from ..ssh_utils import ConnCreds
from ..utils import parse_creds, check_input_param
-from ..node import NodeInfo, Node, FuelNodeInfo
-
logger = logging.getLogger("wally.discover")
-def discover_fuel_nodes(fuel_master_node: Node,
+FuelNodeInfo = NamedTuple("FuelNodeInfo",
+ [("version", List[int]),
+ ("fuel_ext_iface", str),
+ ("openrc", Dict[str, Union[str, bool]])])
+
+
+def discover_fuel_nodes(fuel_master_node: IRPCNode,
fuel_data: Dict[str, Any],
- discover_nodes: bool=True) -> Tuple[List[NodeInfo], FuelNodeInfo]:
+ discover_nodes: bool = True) -> Tuple[List[NodeInfo], FuelNodeInfo]:
"""Discover nodes in fuel cluster, get openrc for selected cluster"""
# parse FUEL REST credentials
@@ -51,17 +56,10 @@
logger.debug("Downloading fuel master key")
fuel_key = fuel_master_node.get_file_content('/root/.ssh/id_rsa')
- # forward ports of cluster nodes to FUEL master
- logger.info("Forwarding ssh ports from FUEL nodes to localhost")
- ips = [str(fuel_node.get_ip(network)) for fuel_node in fuel_nodes]
- port_fw = [fuel_master_node.forward_port(ip, 22) for ip in ips]
- listen_ip = fuel_master_node.get_ip()
-
nodes = []
- for port, fuel_node, ip in zip(port_fw, fuel_nodes, ips):
- logger.debug("SSH port forwarding {} => {}:{}".format(ip, listen_ip, port))
- conn_url = "ssh://root@{}:{}".format(listen_ip, port)
- nodes.append(NodeInfo(conn_url, fuel_node['roles'], listen_ip, fuel_key))
+ for fuel_node in fuel_nodes:
+ ip = str(fuel_node.get_ip(network))
+ nodes.append(NodeInfo(ConnCreds(ip, "root", key=fuel_key), roles=set(fuel_node.get_roles())))
logger.debug("Found {} fuel nodes for env {}".format(len(nodes), fuel_data['openstack_env']))
diff --git a/wally/discover/openstack.py b/wally/discover/openstack.py
index bb128c3..88e8656 100644
--- a/wally/discover/openstack.py
+++ b/wally/discover/openstack.py
@@ -1,18 +1,19 @@
import socket
import logging
-from typing import Iterable, Dict, Any
+from typing import Dict, Any, List
from novaclient.client import Client
-from ..node import NodeInfo
-from wally.utils import parse_creds
+from ..node_interfaces import NodeInfo
+from ..config import ConfigBlock
+from ..utils import parse_creds
logger = logging.getLogger("wally.discover")
-def get_floating_ip(vm) -> str:
+def get_floating_ip(vm: Any) -> str:
"""Get VM floating IP address"""
for net_name, ifaces in vm.addresses.items():
@@ -34,22 +35,21 @@
return "ssh://{}@{}::{}".format(user, ip, key)
-def discover_vms(client: Client, search_opts) -> Iterable[NodeInfo]:
+def discover_vms(client: Client, search_opts: Dict) -> List[NodeInfo]:
"""Discover virtual machines"""
user, password, key = parse_creds(search_opts.pop('auth'))
servers = client.servers.list(search_opts=search_opts)
logger.debug("Found %s openstack vms" % len(servers))
- nodes = []
+ nodes = [] # type: List[NodeInfo]
for server in servers:
ip = get_floating_ip(server)
- nodes.append(NodeInfo(get_ssh_url(user, password, ip, key), ["test_vm"]))
-
+ nodes.append(NodeInfo(get_ssh_url(user, password, ip, key), roles={"test_vm"}))
return nodes
-def discover_services(client: Client, opts: Dict[str, Any]) -> Iterable[NodeInfo]:
+def discover_services(client: Client, opts: Dict[str, Any]) -> List[NodeInfo]:
"""Discover openstack services for given cluster"""
user, password, key = parse_creds(opts.pop('auth'))
@@ -63,15 +63,16 @@
for s in opts['service']:
services.extend(client.services.list(binary=s))
- host_services_mapping = {}
+ host_services_mapping = {} # type: Dict[str, [str]]
for service in services:
ip = socket.gethostbyname(service.host)
- host_services_mapping[ip].append(service.binary)
+ host_services_mapping.get(ip, []).append(service.binary)
logger.debug("Found %s openstack service nodes" %
len(host_services_mapping))
- nodes = []
+
+ nodes = [] # type: List[NodeInfo]
for host, services in host_services_mapping.items():
ssh_url = get_ssh_url(user, password, host, key)
nodes.append(NodeInfo(ssh_url, services))
@@ -79,7 +80,7 @@
return nodes
-def discover_openstack_nodes(conn_details: Dict[str, str], conf: Dict[str, Any]) -> Iterable[NodeInfo]:
+def discover_openstack_nodes(conn_details: Dict[str, str], conf: ConfigBlock) -> List[NodeInfo]:
"""Discover vms running in openstack
conn_details - dict with openstack connection details -
auth_url, api_key (password), username