koder aka kdanilov | 39e449e | 2016-12-17 15:15:26 +0200 | [diff] [blame] | 1 | """ Collect data about ceph nodes""" |
| 2 | import json |
| 3 | import logging |
| 4 | from typing import Set, Dict, cast |
| 5 | |
| 6 | |
| 7 | from .node_interfaces import NodeInfo, IRPCNode |
| 8 | from .ssh_utils import ConnCreds |
| 9 | from .common_types import IP |
| 10 | from .stage import Stage, StepOrder |
| 11 | from .test_run_class import TestRun |
| 12 | from .ssh_utils import parse_ssh_uri |
| 13 | from .node import connect, setup_rpc |
| 14 | |
| 15 | |
| 16 | logger = logging.getLogger("wally.discover") |
| 17 | |
| 18 | |
| 19 | def get_osds_ips(node: IRPCNode, conf: str, key: str) -> Set[IP]: |
| 20 | """Get set of osd's ip""" |
| 21 | |
| 22 | data = node.run("ceph -c {} -k {} --format json osd dump".format(conf, key)) |
| 23 | jdata = json.loads(data) |
| 24 | ips = set() # type: Set[IP] |
| 25 | first_error = True |
| 26 | for osd_data in jdata["osds"]: |
| 27 | if "public_addr" not in osd_data: |
| 28 | if first_error: |
| 29 | osd_id = osd_data.get("osd", "<OSD_ID_MISSED>") |
| 30 | logger.warning("No 'public_addr' field in 'ceph osd dump' output for osd %s" + |
| 31 | "(all subsequent errors omitted)", osd_id) |
| 32 | first_error = False |
| 33 | else: |
| 34 | ip_port = osd_data["public_addr"] |
| 35 | if '/' in ip_port: |
| 36 | ip_port = ip_port.split("/", 1)[0] |
| 37 | ips.add(IP(ip_port.split(":")[0])) |
| 38 | return ips |
| 39 | |
| 40 | |
| 41 | def get_mons_ips(node: IRPCNode, conf: str, key: str) -> Set[IP]: |
| 42 | """Return mon ip set""" |
| 43 | |
| 44 | data = node.run("ceph -c {} -k {} --format json mon_status".format(conf, key)) |
| 45 | jdata = json.loads(data) |
| 46 | ips = set() # type: Set[IP] |
| 47 | first_error = True |
| 48 | for mon_data in jdata["monmap"]["mons"]: |
| 49 | if "addr" not in mon_data: |
| 50 | if first_error: |
| 51 | mon_name = mon_data.get("name", "<MON_NAME_MISSED>") |
| 52 | logger.warning("No 'addr' field in 'ceph mon_status' output for mon %s" + |
| 53 | "(all subsequent errors omitted)", mon_name) |
| 54 | first_error = False |
| 55 | else: |
| 56 | ip_port = mon_data["addr"] |
| 57 | if '/' in ip_port: |
| 58 | ip_port = ip_port.split("/", 1)[0] |
| 59 | ips.add(IP(ip_port.split(":")[0])) |
| 60 | |
| 61 | return ips |
| 62 | |
| 63 | |
| 64 | class DiscoverCephStage(Stage): |
| 65 | config_block = 'ceph' |
| 66 | priority = StepOrder.DISCOVER |
| 67 | |
| 68 | def run(self, ctx: TestRun) -> None: |
| 69 | """Return list of ceph's nodes NodeInfo""" |
| 70 | |
| 71 | if 'ceph_nodes' in ctx.storage: |
| 72 | ctx.nodes_info.extend(ctx.storage.load_list(NodeInfo, 'ceph_nodes')) |
| 73 | else: |
| 74 | ceph = ctx.config.ceph |
| 75 | root_node_uri = cast(str, ceph.root_node) |
| 76 | cluster = ceph.get("cluster", "ceph") |
| 77 | conf = ceph.get("conf") |
| 78 | key = ceph.get("key") |
| 79 | info = NodeInfo(parse_ssh_uri(root_node_uri), set()) |
| 80 | ceph_nodes = {} # type: Dict[IP, NodeInfo] |
| 81 | |
| 82 | if conf is None: |
| 83 | conf = "/etc/ceph/{}.conf".format(cluster) |
| 84 | |
| 85 | if key is None: |
| 86 | key = "/etc/ceph/{}.client.admin.keyring".format(cluster) |
| 87 | |
koder aka kdanilov | e7e1a4d | 2016-12-17 20:29:52 +0200 | [diff] [blame^] | 88 | with setup_rpc(connect(info), ctx.rpc_code, ctx.default_rpc_plugins) as node: |
koder aka kdanilov | 39e449e | 2016-12-17 15:15:26 +0200 | [diff] [blame] | 89 | |
| 90 | # new_nodes.extend(ceph.discover_ceph_nodes(ceph_root_conn, cluster=cluster, conf=conf, key=key)) |
| 91 | ssh_key = node.get_file_content("~/.ssh/id_rsa") |
| 92 | |
| 93 | try: |
| 94 | for ip in get_osds_ips(node, conf, key): |
| 95 | if ip in ceph_nodes: |
| 96 | ceph_nodes[ip].roles.add("ceph-osd") |
| 97 | else: |
| 98 | ceph_nodes[ip] = NodeInfo(ConnCreds(cast(str, ip), user="root", key=ssh_key), {"ceph-osd"}) |
| 99 | except Exception as exc: |
| 100 | logger.error("OSD discovery failed: %s", exc) |
| 101 | |
| 102 | try: |
| 103 | for ip in get_mons_ips(node, conf, key): |
| 104 | if ip in ceph_nodes: |
| 105 | ceph_nodes[ip].roles.add("ceph-mon") |
| 106 | else: |
| 107 | ceph_nodes[ip] = NodeInfo(ConnCreds(cast(str, ip), user="root", key=ssh_key), {"ceph-mon"}) |
| 108 | except Exception as exc: |
| 109 | logger.error("MON discovery failed: %s", exc) |
| 110 | |
| 111 | ctx.nodes_info.extend(ceph_nodes.values()) |
| 112 | ctx.storage['ceph-nodes'] = list(ceph_nodes.values()) |