blob: 35e78b494ddb2433b70dd1133ee1628e4670d127 [file] [log] [blame]
koder aka kdanilov39e449e2016-12-17 15:15:26 +02001""" Collect data about ceph nodes"""
koder aka kdanilov39e449e2016-12-17 15:15:26 +02002import logging
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +03003from typing import Dict, cast, List, Set
koder aka kdanilov39e449e2016-12-17 15:15:26 +02004
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +03005from cephlib import discover
6from cephlib.discover import OSDInfo
kdanylov aka koder026e5f22017-05-15 01:04:39 +03007from cephlib.common import to_ip
8from cephlib.node import NodeInfo, IRPCNode
9from cephlib.ssh import ConnCreds, IP, parse_ssh_uri
10from cephlib.node_impl import connect, setup_rpc
11
12from .stage import Stage, StepOrder
13from .test_run_class import TestRun
14from .utils import StopTestError
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +030015
16
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020017logger = logging.getLogger("wally")
koder aka kdanilov39e449e2016-12-17 15:15:26 +020018
19
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030020def get_osds_info(node: IRPCNode, ceph_extra_args: str = "", thcount: int = 8) -> Dict[IP, List[OSDInfo]]:
koder aka kdanilov39e449e2016-12-17 15:15:26 +020021 """Get set of osd's ip"""
kdanylov aka koder938f75f2018-06-27 01:52:44 +030022 res: Dict[IP, List[OSDInfo]] = {}
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +030023 return {IP(ip): osd_info_list
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030024 for ip, osd_info_list in discover.get_osds_nodes(node.run, ceph_extra_args, thcount=thcount).items()}
koder aka kdanilov39e449e2016-12-17 15:15:26 +020025
26
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +030027def get_mons_ips(node: IRPCNode, ceph_extra_args: str = "") -> Set[IP]:
koder aka kdanilov39e449e2016-12-17 15:15:26 +020028 """Return mon ip set"""
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030029 return {IP(ip) for ip, _ in discover.get_mons_nodes(node.run, ceph_extra_args).values()}
koder aka kdanilov39e449e2016-12-17 15:15:26 +020030
31
32class DiscoverCephStage(Stage):
33 config_block = 'ceph'
34 priority = StepOrder.DISCOVER
35
36 def run(self, ctx: TestRun) -> None:
37 """Return list of ceph's nodes NodeInfo"""
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020038 if 'all_nodes' in ctx.storage:
39 logger.debug("Skip ceph discovery, use previously discovered nodes")
40 return
koder aka kdanilov39e449e2016-12-17 15:15:26 +020041
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030042 if 'metadata' in ctx.config.discover:
kdanylov aka koder150b2192017-04-01 16:53:01 +030043 logger.exception("Ceph metadata discovery is not implemented")
44 raise StopTestError()
45
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030046 ignore_errors = 'ignore_errors' in ctx.config.discover
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020047 ceph = ctx.config.ceph
48 root_node_uri = cast(str, ceph.root_node)
49 cluster = ceph.get("cluster", "ceph")
kdanylov aka koder026e5f22017-05-15 01:04:39 +030050 ip_remap = ctx.config.ceph.get('ip_remap', {})
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +030051
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020052 conf = ceph.get("conf")
53 key = ceph.get("key")
koder aka kdanilov39e449e2016-12-17 15:15:26 +020054
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020055 if conf is None:
kdanylov aka koder938f75f2018-06-27 01:52:44 +030056 conf = f"/etc/ceph/{cluster}.conf"
koder aka kdanilov39e449e2016-12-17 15:15:26 +020057
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020058 if key is None:
kdanylov aka koder938f75f2018-06-27 01:52:44 +030059 key = f"/etc/ceph/{cluster}.client.admin.keyring"
koder aka kdanilov39e449e2016-12-17 15:15:26 +020060
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +030061 ceph_extra_args = ""
62
63 if conf:
kdanylov aka koder938f75f2018-06-27 01:52:44 +030064 ceph_extra_args += f" -c '{conf}'"
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +030065
66 if key:
kdanylov aka koder938f75f2018-06-27 01:52:44 +030067 ceph_extra_args += f" -k '{key}'"
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +030068
kdanylov aka koder938f75f2018-06-27 01:52:44 +030069 logger.debug(f"Start discovering ceph nodes from root {root_node_uri}")
70 logger.debug(f"cluster={cluster} key={conf} conf={key}")
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +030071
72 info = NodeInfo(parse_ssh_uri(root_node_uri), set())
73
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020074 ceph_params = {"cluster": cluster, "conf": conf, "key": key}
75
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +030076 with setup_rpc(connect(info), ctx.rpc_code, ctx.default_rpc_plugins,
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020077 log_level=ctx.config.rpc_log_level) as node:
78
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020079 try:
80 ips = set()
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030081 for ip, osds_info in get_osds_info(node, ceph_extra_args, thcount=16).items():
kdanylov aka koder026e5f22017-05-15 01:04:39 +030082 ip = ip_remap.get(ip, ip)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020083 ips.add(ip)
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030084 creds = ConnCreds(to_ip(cast(str, ip)), user="root")
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020085 info = ctx.merge_node(creds, {'ceph-osd'})
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030086 info.params.setdefault('ceph-osds', []).extend(info.__dict__.copy() for info in osds_info)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020087 assert 'ceph' not in info.params or info.params['ceph'] == ceph_params
88 info.params['ceph'] = ceph_params
kdanylov aka koder938f75f2018-06-27 01:52:44 +030089 logger.debug(f"Found {len(ips)} nodes with ceph-osd role")
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020090 except Exception as exc:
kdanylov aka koder150b2192017-04-01 16:53:01 +030091 if not ignore_errors:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020092 logger.exception("OSD discovery failed")
93 raise StopTestError()
94 else:
kdanylov aka koder938f75f2018-06-27 01:52:44 +030095 logger.warning(f"OSD discovery failed {exc}")
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020096
97 try:
98 counter = 0
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +030099 for counter, ip in enumerate(get_mons_ips(node, ceph_extra_args)):
kdanylov aka koder026e5f22017-05-15 01:04:39 +0300100 ip = ip_remap.get(ip, ip)
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300101 creds = ConnCreds(to_ip(cast(str, ip)), user="root")
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200102 info = ctx.merge_node(creds, {'ceph-mon'})
103 assert 'ceph' not in info.params or info.params['ceph'] == ceph_params
104 info.params['ceph'] = ceph_params
kdanylov aka koder938f75f2018-06-27 01:52:44 +0300105 logger.debug(f"Found {counter + 1} nodes with ceph-mon role")
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200106 except Exception as exc:
kdanylov aka koder150b2192017-04-01 16:53:01 +0300107 if not ignore_errors:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200108 logger.exception("MON discovery failed")
109 raise StopTestError()
110 else:
kdanylov aka koder938f75f2018-06-27 01:52:44 +0300111 logger.warning(f"MON discovery failed {exc}")
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300112
113
kdanylov aka koder026e5f22017-05-15 01:04:39 +0300114def raw_dev_name(path: str) -> str:
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300115 if path.startswith("/dev/"):
116 path = path[5:]
117 while path and path[-1].isdigit():
118 path = path[:-1]
119 return path
120
121
kdanylov aka koderb0833332017-05-13 20:39:17 +0300122class CollectCephInfoStage(Stage):
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300123 config_block = 'ceph'
124 priority = StepOrder.UPDATE_NODES_INFO
125
126 def run(self, ctx: TestRun) -> None:
127 for node in ctx.nodes:
128 if 'ceph_storage_devs' not in node.info.params:
129 if 'ceph-osd' in node.info.roles:
kdanylov aka koder938f75f2018-06-27 01:52:44 +0300130 jdevs: Set[str] = set()
131 sdevs: Set[str] = set()
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300132 for osd_info in node.info.params['ceph-osds']:
133 for key, sset in [('journal', jdevs), ('storage', sdevs)]:
134 path = osd_info.get(key)
135 if path:
136 dpath = node.conn.fs.get_dev_for_file(path)
137 if isinstance(dpath, bytes):
138 dpath = dpath.decode('utf8')
139 sset.add(raw_dev_name(dpath))
140 node.info.params['ceph_storage_devs'] = list(sdevs)
141 node.info.params['ceph_journal_devs'] = list(jdevs)