blob: 1e7912635efe85c9109ec6b777f34b00c71db385 [file] [log] [blame]
koder aka kdanilov39e449e2016-12-17 15:15:26 +02001""" Collect data about ceph nodes"""
2import json
3import logging
4from typing import Set, Dict, cast
5
6
7from .node_interfaces import NodeInfo, IRPCNode
8from .ssh_utils import ConnCreds
9from .common_types import IP
10from .stage import Stage, StepOrder
11from .test_run_class import TestRun
12from .ssh_utils import parse_ssh_uri
13from .node import connect, setup_rpc
14
15
16logger = logging.getLogger("wally.discover")
17
18
19def get_osds_ips(node: IRPCNode, conf: str, key: str) -> Set[IP]:
20 """Get set of osd's ip"""
21
22 data = node.run("ceph -c {} -k {} --format json osd dump".format(conf, key))
23 jdata = json.loads(data)
24 ips = set() # type: Set[IP]
25 first_error = True
26 for osd_data in jdata["osds"]:
27 if "public_addr" not in osd_data:
28 if first_error:
29 osd_id = osd_data.get("osd", "<OSD_ID_MISSED>")
30 logger.warning("No 'public_addr' field in 'ceph osd dump' output for osd %s" +
31 "(all subsequent errors omitted)", osd_id)
32 first_error = False
33 else:
34 ip_port = osd_data["public_addr"]
35 if '/' in ip_port:
36 ip_port = ip_port.split("/", 1)[0]
37 ips.add(IP(ip_port.split(":")[0]))
38 return ips
39
40
41def get_mons_ips(node: IRPCNode, conf: str, key: str) -> Set[IP]:
42 """Return mon ip set"""
43
44 data = node.run("ceph -c {} -k {} --format json mon_status".format(conf, key))
45 jdata = json.loads(data)
46 ips = set() # type: Set[IP]
47 first_error = True
48 for mon_data in jdata["monmap"]["mons"]:
49 if "addr" not in mon_data:
50 if first_error:
51 mon_name = mon_data.get("name", "<MON_NAME_MISSED>")
52 logger.warning("No 'addr' field in 'ceph mon_status' output for mon %s" +
53 "(all subsequent errors omitted)", mon_name)
54 first_error = False
55 else:
56 ip_port = mon_data["addr"]
57 if '/' in ip_port:
58 ip_port = ip_port.split("/", 1)[0]
59 ips.add(IP(ip_port.split(":")[0]))
60
61 return ips
62
63
64class DiscoverCephStage(Stage):
65 config_block = 'ceph'
66 priority = StepOrder.DISCOVER
67
68 def run(self, ctx: TestRun) -> None:
69 """Return list of ceph's nodes NodeInfo"""
70
71 if 'ceph_nodes' in ctx.storage:
72 ctx.nodes_info.extend(ctx.storage.load_list(NodeInfo, 'ceph_nodes'))
73 else:
74 ceph = ctx.config.ceph
75 root_node_uri = cast(str, ceph.root_node)
76 cluster = ceph.get("cluster", "ceph")
77 conf = ceph.get("conf")
78 key = ceph.get("key")
79 info = NodeInfo(parse_ssh_uri(root_node_uri), set())
80 ceph_nodes = {} # type: Dict[IP, NodeInfo]
81
82 if conf is None:
83 conf = "/etc/ceph/{}.conf".format(cluster)
84
85 if key is None:
86 key = "/etc/ceph/{}.client.admin.keyring".format(cluster)
87
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +020088 with setup_rpc(connect(info), ctx.rpc_code, ctx.default_rpc_plugins) as node:
koder aka kdanilov39e449e2016-12-17 15:15:26 +020089
90 # new_nodes.extend(ceph.discover_ceph_nodes(ceph_root_conn, cluster=cluster, conf=conf, key=key))
91 ssh_key = node.get_file_content("~/.ssh/id_rsa")
92
93 try:
94 for ip in get_osds_ips(node, conf, key):
95 if ip in ceph_nodes:
96 ceph_nodes[ip].roles.add("ceph-osd")
97 else:
98 ceph_nodes[ip] = NodeInfo(ConnCreds(cast(str, ip), user="root", key=ssh_key), {"ceph-osd"})
99 except Exception as exc:
100 logger.error("OSD discovery failed: %s", exc)
101
102 try:
103 for ip in get_mons_ips(node, conf, key):
104 if ip in ceph_nodes:
105 ceph_nodes[ip].roles.add("ceph-mon")
106 else:
107 ceph_nodes[ip] = NodeInfo(ConnCreds(cast(str, ip), user="root", key=ssh_key), {"ceph-mon"})
108 except Exception as exc:
109 logger.error("MON discovery failed: %s", exc)
110
111 ctx.nodes_info.extend(ceph_nodes.values())
112 ctx.storage['ceph-nodes'] = list(ceph_nodes.values())