| koder aka kdanilov | 39e449e | 2016-12-17 15:15:26 +0200 | [diff] [blame] | 1 | """ Collect data about ceph nodes""" | 
 | 2 | import json | 
 | 3 | import logging | 
| koder aka kdanilov | 962ee5f | 2016-12-19 02:40:08 +0200 | [diff] [blame] | 4 | from typing import Dict, cast, List, Set, Optional | 
| koder aka kdanilov | 39e449e | 2016-12-17 15:15:26 +0200 | [diff] [blame] | 5 |  | 
 | 6 |  | 
 | 7 | from .node_interfaces import NodeInfo, IRPCNode | 
 | 8 | from .ssh_utils import ConnCreds | 
 | 9 | from .common_types import IP | 
 | 10 | from .stage import Stage, StepOrder | 
 | 11 | from .test_run_class import TestRun | 
 | 12 | from .ssh_utils import parse_ssh_uri | 
 | 13 | from .node import connect, setup_rpc | 
| koder aka kdanilov | bbbe1dc | 2016-12-20 01:19:56 +0200 | [diff] [blame^] | 14 | from .utils import StopTestError, to_ip | 
| koder aka kdanilov | 39e449e | 2016-12-17 15:15:26 +0200 | [diff] [blame] | 15 |  | 
 | 16 |  | 
| koder aka kdanilov | 962ee5f | 2016-12-19 02:40:08 +0200 | [diff] [blame] | 17 | logger = logging.getLogger("wally") | 
| koder aka kdanilov | 39e449e | 2016-12-17 15:15:26 +0200 | [diff] [blame] | 18 |  | 
 | 19 |  | 
| koder aka kdanilov | 962ee5f | 2016-12-19 02:40:08 +0200 | [diff] [blame] | 20 | class OSDInfo: | 
 | 21 |     def __init__(self, id: int, journal: str = None, storage: str = None) -> None: | 
 | 22 |         self.id = id | 
 | 23 |         self.journal = journal | 
 | 24 |         self.storage = storage | 
 | 25 |  | 
 | 26 |  | 
 | 27 | def get_osds_info(node: IRPCNode, conf: str, key: str) -> Dict[IP, List[OSDInfo]]: | 
| koder aka kdanilov | 39e449e | 2016-12-17 15:15:26 +0200 | [diff] [blame] | 28 |     """Get set of osd's ip""" | 
 | 29 |  | 
 | 30 |     data = node.run("ceph -c {} -k {} --format json osd dump".format(conf, key)) | 
| koder aka kdanilov | 3af3c33 | 2016-12-19 17:12:34 +0200 | [diff] [blame] | 31 |     try: | 
 | 32 |         jdata = json.loads(data) | 
 | 33 |     except: | 
 | 34 |         open("/tmp/ceph-out.json", "w").write(data) | 
 | 35 |         raise | 
| koder aka kdanilov | 962ee5f | 2016-12-19 02:40:08 +0200 | [diff] [blame] | 36 |     ips = {}  # type: Dict[IP, List[OSDInfo]] | 
| koder aka kdanilov | 39e449e | 2016-12-17 15:15:26 +0200 | [diff] [blame] | 37 |     first_error = True | 
 | 38 |     for osd_data in jdata["osds"]: | 
| koder aka kdanilov | 962ee5f | 2016-12-19 02:40:08 +0200 | [diff] [blame] | 39 |         osd_id = int(osd_data["osd"]) | 
| koder aka kdanilov | 39e449e | 2016-12-17 15:15:26 +0200 | [diff] [blame] | 40 |         if "public_addr" not in osd_data: | 
 | 41 |             if first_error: | 
| koder aka kdanilov | 39e449e | 2016-12-17 15:15:26 +0200 | [diff] [blame] | 42 |                 logger.warning("No 'public_addr' field in 'ceph osd dump' output for osd %s" + | 
 | 43 |                                "(all subsequent errors omitted)", osd_id) | 
 | 44 |                 first_error = False | 
 | 45 |         else: | 
 | 46 |             ip_port = osd_data["public_addr"] | 
 | 47 |             if '/' in ip_port: | 
 | 48 |                 ip_port = ip_port.split("/", 1)[0] | 
| koder aka kdanilov | 962ee5f | 2016-12-19 02:40:08 +0200 | [diff] [blame] | 49 |             ip = IP(ip_port.split(":")[0]) | 
 | 50 |  | 
 | 51 |             osd_journal_path = None  # type: Optional[str] | 
 | 52 |             osd_data_path = None  # type: Optional[str] | 
 | 53 |  | 
 | 54 |             # TODO: parallelize this! | 
 | 55 |             osd_cfg = node.run("ceph -n osd.{} --show-config".format(osd_id)) | 
 | 56 |             for line in osd_cfg.split("\n"): | 
 | 57 |                 if line.startswith("osd_journal ="): | 
 | 58 |                     osd_journal_path = line.split("=")[1].strip() | 
 | 59 |                 elif line.startswith("osd_data ="): | 
 | 60 |                     osd_data_path = line.split("=")[1].strip() | 
 | 61 |  | 
 | 62 |             if osd_data_path is None or osd_journal_path is None: | 
| koder aka kdanilov | 3af3c33 | 2016-12-19 17:12:34 +0200 | [diff] [blame] | 63 |                 open("/tmp/ceph-out.json", "w").write(osd_cfg) | 
| koder aka kdanilov | 962ee5f | 2016-12-19 02:40:08 +0200 | [diff] [blame] | 64 |                 logger.error("Can't detect osd %s journal or storage path", osd_id) | 
 | 65 |                 raise StopTestError() | 
 | 66 |  | 
 | 67 |             ips.setdefault(ip, []).append(OSDInfo(osd_id, | 
 | 68 |                                                   journal=osd_journal_path, | 
 | 69 |                                                   storage=osd_data_path)) | 
| koder aka kdanilov | 39e449e | 2016-12-17 15:15:26 +0200 | [diff] [blame] | 70 |     return ips | 
 | 71 |  | 
 | 72 |  | 
 | 73 | def get_mons_ips(node: IRPCNode, conf: str, key: str) -> Set[IP]: | 
 | 74 |     """Return mon ip set""" | 
 | 75 |  | 
 | 76 |     data = node.run("ceph -c {} -k {} --format json mon_status".format(conf, key)) | 
 | 77 |     jdata = json.loads(data) | 
 | 78 |     ips = set()  # type: Set[IP] | 
 | 79 |     first_error = True | 
 | 80 |     for mon_data in jdata["monmap"]["mons"]: | 
 | 81 |         if "addr" not in mon_data: | 
 | 82 |             if first_error: | 
 | 83 |                 mon_name = mon_data.get("name", "<MON_NAME_MISSED>") | 
 | 84 |                 logger.warning("No 'addr' field in 'ceph mon_status' output for mon %s" + | 
 | 85 |                                "(all subsequent errors omitted)", mon_name) | 
 | 86 |                 first_error = False | 
 | 87 |         else: | 
 | 88 |             ip_port = mon_data["addr"] | 
 | 89 |             if '/' in ip_port: | 
 | 90 |                 ip_port = ip_port.split("/", 1)[0] | 
 | 91 |             ips.add(IP(ip_port.split(":")[0])) | 
 | 92 |  | 
 | 93 |     return ips | 
 | 94 |  | 
 | 95 |  | 
 | 96 | class DiscoverCephStage(Stage): | 
 | 97 |     config_block = 'ceph' | 
 | 98 |     priority = StepOrder.DISCOVER | 
 | 99 |  | 
 | 100 |     def run(self, ctx: TestRun) -> None: | 
 | 101 |         """Return list of ceph's nodes NodeInfo""" | 
 | 102 |  | 
| koder aka kdanilov | 962ee5f | 2016-12-19 02:40:08 +0200 | [diff] [blame] | 103 |         discovery = ctx.config.get("discovery") | 
 | 104 |         if discovery == 'disable' or discovery == 'metadata': | 
 | 105 |             logger.info("Skip ceph discovery due to config setting") | 
 | 106 |             return | 
| koder aka kdanilov | 39e449e | 2016-12-17 15:15:26 +0200 | [diff] [blame] | 107 |  | 
| koder aka kdanilov | 962ee5f | 2016-12-19 02:40:08 +0200 | [diff] [blame] | 108 |         if 'all_nodes' in ctx.storage: | 
 | 109 |             logger.debug("Skip ceph discovery, use previously discovered nodes") | 
 | 110 |             return | 
| koder aka kdanilov | 39e449e | 2016-12-17 15:15:26 +0200 | [diff] [blame] | 111 |  | 
| koder aka kdanilov | 962ee5f | 2016-12-19 02:40:08 +0200 | [diff] [blame] | 112 |         ceph = ctx.config.ceph | 
 | 113 |         root_node_uri = cast(str, ceph.root_node) | 
 | 114 |         cluster = ceph.get("cluster", "ceph") | 
 | 115 |         conf = ceph.get("conf") | 
 | 116 |         key = ceph.get("key") | 
| koder aka kdanilov | 39e449e | 2016-12-17 15:15:26 +0200 | [diff] [blame] | 117 |  | 
| koder aka kdanilov | 962ee5f | 2016-12-19 02:40:08 +0200 | [diff] [blame] | 118 |         logger.debug("Start discovering ceph nodes from root %s", root_node_uri) | 
 | 119 |         logger.debug("cluster=%s key=%s conf=%s", cluster, conf, key) | 
| koder aka kdanilov | 39e449e | 2016-12-17 15:15:26 +0200 | [diff] [blame] | 120 |  | 
| koder aka kdanilov | 962ee5f | 2016-12-19 02:40:08 +0200 | [diff] [blame] | 121 |         info = NodeInfo(parse_ssh_uri(root_node_uri), set()) | 
| koder aka kdanilov | 39e449e | 2016-12-17 15:15:26 +0200 | [diff] [blame] | 122 |  | 
| koder aka kdanilov | 962ee5f | 2016-12-19 02:40:08 +0200 | [diff] [blame] | 123 |         if conf is None: | 
 | 124 |             conf = "/etc/ceph/{}.conf".format(cluster) | 
| koder aka kdanilov | 39e449e | 2016-12-17 15:15:26 +0200 | [diff] [blame] | 125 |  | 
| koder aka kdanilov | 962ee5f | 2016-12-19 02:40:08 +0200 | [diff] [blame] | 126 |         if key is None: | 
 | 127 |             key = "/etc/ceph/{}.client.admin.keyring".format(cluster) | 
| koder aka kdanilov | 39e449e | 2016-12-17 15:15:26 +0200 | [diff] [blame] | 128 |  | 
| koder aka kdanilov | 962ee5f | 2016-12-19 02:40:08 +0200 | [diff] [blame] | 129 |         ceph_params = {"cluster": cluster, "conf": conf, "key": key} | 
 | 130 |  | 
 | 131 |         with setup_rpc(connect(info), | 
 | 132 |                        ctx.rpc_code, | 
 | 133 |                        ctx.default_rpc_plugins, | 
 | 134 |                        log_level=ctx.config.rpc_log_level) as node: | 
 | 135 |  | 
 | 136 |             ssh_key = node.get_file_content("~/.ssh/id_rsa") | 
 | 137 |  | 
 | 138 |  | 
 | 139 |             try: | 
 | 140 |                 ips = set() | 
 | 141 |                 for ip, osds_info in get_osds_info(node, conf, key).items(): | 
 | 142 |                     ips.add(ip) | 
| koder aka kdanilov | bbbe1dc | 2016-12-20 01:19:56 +0200 | [diff] [blame^] | 143 |                     creds = ConnCreds(to_ip(cast(str, ip)), user="root", key=ssh_key) | 
| koder aka kdanilov | 962ee5f | 2016-12-19 02:40:08 +0200 | [diff] [blame] | 144 |                     info = ctx.merge_node(creds, {'ceph-osd'}) | 
 | 145 |                     info.params.setdefault('ceph-osds', []).extend(osds_info) | 
 | 146 |                     assert 'ceph' not in info.params or info.params['ceph'] == ceph_params | 
 | 147 |                     info.params['ceph'] = ceph_params | 
 | 148 |  | 
 | 149 |                 logger.debug("Found %s nodes with ceph-osd role", len(ips)) | 
 | 150 |             except Exception as exc: | 
 | 151 |                 if discovery != 'ignore_errors': | 
 | 152 |                     logger.exception("OSD discovery failed") | 
 | 153 |                     raise StopTestError() | 
 | 154 |                 else: | 
 | 155 |                     logger.warning("OSD discovery failed %s", exc) | 
 | 156 |  | 
 | 157 |             try: | 
 | 158 |                 counter = 0 | 
 | 159 |                 for counter, ip in enumerate(get_mons_ips(node, conf, key)): | 
| koder aka kdanilov | bbbe1dc | 2016-12-20 01:19:56 +0200 | [diff] [blame^] | 160 |                     creds = ConnCreds(to_ip(cast(str, ip)), user="root", key=ssh_key) | 
| koder aka kdanilov | 962ee5f | 2016-12-19 02:40:08 +0200 | [diff] [blame] | 161 |                     info = ctx.merge_node(creds, {'ceph-mon'}) | 
 | 162 |                     assert 'ceph' not in info.params or info.params['ceph'] == ceph_params | 
 | 163 |                     info.params['ceph'] = ceph_params | 
 | 164 |                 logger.debug("Found %s nodes with ceph-mon role", counter + 1) | 
 | 165 |             except Exception as exc: | 
 | 166 |                 if discovery != 'ignore_errors': | 
 | 167 |                     logger.exception("MON discovery failed") | 
 | 168 |                     raise StopTestError() | 
 | 169 |                 else: | 
 | 170 |                     logger.warning("MON discovery failed %s", exc) |