blob: 4afff06c3ae9fb2501a04a0c99faaac4fe6a4259 [file] [log] [blame]
koder aka kdanilov39e449e2016-12-17 15:15:26 +02001""" Collect data about ceph nodes"""
kdanylov aka koder13e58452018-07-15 02:51:51 +03002import enum
koder aka kdanilov39e449e2016-12-17 15:15:26 +02003import logging
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +03004from typing import Dict, cast, List, Set
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +03005from cephlib import discover
6from cephlib.discover import OSDInfo
kdanylov aka koder026e5f22017-05-15 01:04:39 +03007from cephlib.common import to_ip
8from cephlib.node import NodeInfo, IRPCNode
9from cephlib.ssh import ConnCreds, IP, parse_ssh_uri
10from cephlib.node_impl import connect, setup_rpc
11
12from .stage import Stage, StepOrder
13from .test_run_class import TestRun
14from .utils import StopTestError
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +030015
16
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020017logger = logging.getLogger("wally")
koder aka kdanilov39e449e2016-12-17 15:15:26 +020018
19
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030020def get_osds_info(node: IRPCNode, ceph_extra_args: str = "", thcount: int = 8) -> Dict[IP, List[OSDInfo]]:
koder aka kdanilov39e449e2016-12-17 15:15:26 +020021 """Get set of osd's ip"""
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +030022 return {IP(ip): osd_info_list
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030023 for ip, osd_info_list in discover.get_osds_nodes(node.run, ceph_extra_args, thcount=thcount).items()}
koder aka kdanilov39e449e2016-12-17 15:15:26 +020024
25
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +030026def get_mons_ips(node: IRPCNode, ceph_extra_args: str = "") -> Set[IP]:
koder aka kdanilov39e449e2016-12-17 15:15:26 +020027 """Return mon ip set"""
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030028 return {IP(ip) for ip, _ in discover.get_mons_nodes(node.run, ceph_extra_args).values()}
koder aka kdanilov39e449e2016-12-17 15:15:26 +020029
30
31class DiscoverCephStage(Stage):
32 config_block = 'ceph'
33 priority = StepOrder.DISCOVER
34
35 def run(self, ctx: TestRun) -> None:
36 """Return list of ceph's nodes NodeInfo"""
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020037 if 'all_nodes' in ctx.storage:
38 logger.debug("Skip ceph discovery, use previously discovered nodes")
39 return
koder aka kdanilov39e449e2016-12-17 15:15:26 +020040
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030041 if 'metadata' in ctx.config.discover:
kdanylov aka koder150b2192017-04-01 16:53:01 +030042 logger.exception("Ceph metadata discovery is not implemented")
43 raise StopTestError()
44
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030045 ignore_errors = 'ignore_errors' in ctx.config.discover
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020046 ceph = ctx.config.ceph
kdanylov aka koder13e58452018-07-15 02:51:51 +030047 try:
48 root_node_uri = cast(str, ceph.root_node)
49 except AttributeError:
50 logger.error("'root_node' option must be provided in 'ceph' config section. " +
51 "It must be the name of the node, which has access to ceph")
52 raise StopTestError()
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020053 cluster = ceph.get("cluster", "ceph")
kdanylov aka koder026e5f22017-05-15 01:04:39 +030054 ip_remap = ctx.config.ceph.get('ip_remap', {})
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +030055
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020056 conf = ceph.get("conf")
57 key = ceph.get("key")
koder aka kdanilov39e449e2016-12-17 15:15:26 +020058
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020059 if conf is None:
kdanylov aka koder938f75f2018-06-27 01:52:44 +030060 conf = f"/etc/ceph/{cluster}.conf"
koder aka kdanilov39e449e2016-12-17 15:15:26 +020061
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020062 if key is None:
kdanylov aka koder938f75f2018-06-27 01:52:44 +030063 key = f"/etc/ceph/{cluster}.client.admin.keyring"
koder aka kdanilov39e449e2016-12-17 15:15:26 +020064
kdanylov aka koder13e58452018-07-15 02:51:51 +030065 ctx.ceph_extra_args = f" -c '{conf}' -k '{key}'"
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +030066
kdanylov aka koder938f75f2018-06-27 01:52:44 +030067 logger.debug(f"Start discovering ceph nodes from root {root_node_uri}")
68 logger.debug(f"cluster={cluster} key={conf} conf={key}")
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +030069
70 info = NodeInfo(parse_ssh_uri(root_node_uri), set())
71
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020072 ceph_params = {"cluster": cluster, "conf": conf, "key": key}
73
kdanylov aka koder13e58452018-07-15 02:51:51 +030074 ssh_user = ctx.config.ssh_opts.get("user")
75 ssh_key = ctx.config.ssh_opts.get("key")
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020076
kdanylov aka koder13e58452018-07-15 02:51:51 +030077 node = ctx.ceph_master_node = setup_rpc(connect(info), ctx.rpc_code, ctx.default_rpc_plugins,
78 log_level=ctx.config.rpc_log_level,
79 sudo=ctx.config.ssh_opts.get("sudo", False))
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020080
kdanylov aka koder13e58452018-07-15 02:51:51 +030081 try:
82 ips = set()
83 for ip, osds_info in get_osds_info(node, ctx.ceph_extra_args, thcount=16).items():
84 ip = ip_remap.get(ip, ip)
85 ips.add(ip)
86 creds = ConnCreds(to_ip(cast(str, ip)), user=ssh_user, key_file=ssh_key)
87 info = ctx.merge_node(creds, {'ceph-osd'})
88 info.params.setdefault('ceph-osds', []).extend(info.__dict__.copy() for info in osds_info)
89 assert 'ceph' not in info.params or info.params['ceph'] == ceph_params
90 info.params['ceph'] = ceph_params
91 logger.debug(f"Found {len(ips)} nodes with ceph-osd role")
92 except Exception as exc:
93 if not ignore_errors:
94 logger.exception("OSD discovery failed")
95 raise StopTestError()
96 else:
97 logger.warning(f"OSD discovery failed {exc}")
98
99 try:
100 counter = 0
101 for counter, ip in enumerate(get_mons_ips(node, ctx.ceph_extra_args)):
102 ip = ip_remap.get(ip, ip)
103 creds = ConnCreds(to_ip(cast(str, ip)), user=ssh_user, key_file=ssh_key)
104 info = ctx.merge_node(creds, {'ceph-mon'})
105 assert 'ceph' not in info.params or info.params['ceph'] == ceph_params
106 info.params['ceph'] = ceph_params
107 logger.debug(f"Found {counter + 1} nodes with ceph-mon role")
108 except Exception as exc:
109 if not ignore_errors:
110 logger.exception("MON discovery failed")
111 raise StopTestError()
112 else:
113 logger.warning(f"MON discovery failed {exc}")
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300114
115
kdanylov aka koder026e5f22017-05-15 01:04:39 +0300116def raw_dev_name(path: str) -> str:
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300117 if path.startswith("/dev/"):
118 path = path[5:]
119 while path and path[-1].isdigit():
120 path = path[:-1]
121 return path
122
123
kdanylov aka koderb0833332017-05-13 20:39:17 +0300124class CollectCephInfoStage(Stage):
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300125 config_block = 'ceph'
126 priority = StepOrder.UPDATE_NODES_INFO
127
128 def run(self, ctx: TestRun) -> None:
129 for node in ctx.nodes:
130 if 'ceph_storage_devs' not in node.info.params:
131 if 'ceph-osd' in node.info.roles:
kdanylov aka koder938f75f2018-06-27 01:52:44 +0300132 jdevs: Set[str] = set()
133 sdevs: Set[str] = set()
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300134 for osd_info in node.info.params['ceph-osds']:
kdanylov aka koder13e58452018-07-15 02:51:51 +0300135
136 if osd_info['bluestore'] is None:
137 osd_stor_type_b = node.conn.fs.get_file(osd_info['storage'] + "/type", compress=False)
138 osd_stor_type = osd_stor_type_b.decode('utf8').strip()
139 osd_info['bluestore'] = osd_stor_type == 'bluestore'
140
141 if osd_info['bluestore']:
142 for name, sset in [('block.db', jdevs), ('block.wal', jdevs), ('block', sdevs)]:
143 path = f"{osd_info['storage']}/{name}"
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300144 dpath = node.conn.fs.get_dev_for_file(path)
145 if isinstance(dpath, bytes):
146 dpath = dpath.decode('utf8')
147 sset.add(raw_dev_name(dpath))
kdanylov aka koder13e58452018-07-15 02:51:51 +0300148 else:
149 for key, sset in [('journal', jdevs), ('storage', sdevs)]:
150 path = osd_info.get(key)
151 if path:
152 dpath = node.conn.fs.get_dev_for_file(path)
153 if isinstance(dpath, bytes):
154 dpath = dpath.decode('utf8')
155 sset.add(raw_dev_name(dpath))
156
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300157 node.info.params['ceph_storage_devs'] = list(sdevs)
158 node.info.params['ceph_journal_devs'] = list(jdevs)