blob: 3527db33b50e234844eb86eb946c72a3aa9c7809 [file] [log] [blame]
koder aka kdanilov39e449e2016-12-17 15:15:26 +02001""" Collect data about ceph nodes"""
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +03002import os
koder aka kdanilov39e449e2016-12-17 15:15:26 +02003import logging
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +03004from typing import Dict, cast, List, Set
koder aka kdanilov39e449e2016-12-17 15:15:26 +02005
6
7from .node_interfaces import NodeInfo, IRPCNode
8from .ssh_utils import ConnCreds
9from .common_types import IP
10from .stage import Stage, StepOrder
11from .test_run_class import TestRun
12from .ssh_utils import parse_ssh_uri
13from .node import connect, setup_rpc
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +020014from .utils import StopTestError, to_ip
koder aka kdanilov39e449e2016-12-17 15:15:26 +020015
16
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +030017from cephlib import discover
18from cephlib.discover import OSDInfo
19
20
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020021logger = logging.getLogger("wally")
koder aka kdanilov39e449e2016-12-17 15:15:26 +020022
23
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030024def get_osds_info(node: IRPCNode, ceph_extra_args: str = "", thcount: int = 8) -> Dict[IP, List[OSDInfo]]:
koder aka kdanilov39e449e2016-12-17 15:15:26 +020025 """Get set of osd's ip"""
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +030026 res = {} # type: Dict[IP, List[OSDInfo]]
27 return {IP(ip): osd_info_list
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030028 for ip, osd_info_list in discover.get_osds_nodes(node.run, ceph_extra_args, thcount=thcount).items()}
koder aka kdanilov39e449e2016-12-17 15:15:26 +020029
30
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +030031def get_mons_ips(node: IRPCNode, ceph_extra_args: str = "") -> Set[IP]:
koder aka kdanilov39e449e2016-12-17 15:15:26 +020032 """Return mon ip set"""
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030033 return {IP(ip) for ip, _ in discover.get_mons_nodes(node.run, ceph_extra_args).values()}
koder aka kdanilov39e449e2016-12-17 15:15:26 +020034
35
36class DiscoverCephStage(Stage):
37 config_block = 'ceph'
38 priority = StepOrder.DISCOVER
39
40 def run(self, ctx: TestRun) -> None:
41 """Return list of ceph's nodes NodeInfo"""
42
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030043 if 'ceph' not in ctx.config.discover:
44 print(ctx.config.discover)
kdanylov aka koder150b2192017-04-01 16:53:01 +030045 logger.debug("Skip ceph discovery due to config setting")
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020046 return
koder aka kdanilov39e449e2016-12-17 15:15:26 +020047
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020048 if 'all_nodes' in ctx.storage:
49 logger.debug("Skip ceph discovery, use previously discovered nodes")
50 return
koder aka kdanilov39e449e2016-12-17 15:15:26 +020051
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030052 if 'metadata' in ctx.config.discover:
kdanylov aka koder150b2192017-04-01 16:53:01 +030053 logger.exception("Ceph metadata discovery is not implemented")
54 raise StopTestError()
55
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030056 ignore_errors = 'ignore_errors' in ctx.config.discover
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020057 ceph = ctx.config.ceph
58 root_node_uri = cast(str, ceph.root_node)
59 cluster = ceph.get("cluster", "ceph")
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +030060
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020061 conf = ceph.get("conf")
62 key = ceph.get("key")
koder aka kdanilov39e449e2016-12-17 15:15:26 +020063
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020064 if conf is None:
65 conf = "/etc/ceph/{}.conf".format(cluster)
koder aka kdanilov39e449e2016-12-17 15:15:26 +020066
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020067 if key is None:
68 key = "/etc/ceph/{}.client.admin.keyring".format(cluster)
koder aka kdanilov39e449e2016-12-17 15:15:26 +020069
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +030070 ceph_extra_args = ""
71
72 if conf:
73 ceph_extra_args += " -c '{}'".format(conf)
74
75 if key:
76 ceph_extra_args += " -k '{}'".format(key)
77
78 logger.debug("Start discovering ceph nodes from root %s", root_node_uri)
79 logger.debug("cluster=%s key=%s conf=%s", cluster, conf, key)
80
81 info = NodeInfo(parse_ssh_uri(root_node_uri), set())
82
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020083 ceph_params = {"cluster": cluster, "conf": conf, "key": key}
84
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030085 ip_remap = {
86 '10.8.0.4': '172.16.164.71',
87 '10.8.0.3': '172.16.164.72',
88 '10.8.0.2': '172.16.164.73',
89 '10.8.0.5': '172.16.164.74',
90 '10.8.0.6': '172.16.164.75',
91 '10.8.0.7': '172.16.164.76',
92 '10.8.0.8': '172.16.164.77',
93 '10.8.0.9': '172.16.164.78',
94 }
95
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +030096 with setup_rpc(connect(info), ctx.rpc_code, ctx.default_rpc_plugins,
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020097 log_level=ctx.config.rpc_log_level) as node:
98
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030099 # ssh_key = node.get_file_content("~/.ssh/id_rsa", expanduser=True)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200100
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200101 try:
102 ips = set()
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300103 for ip, osds_info in get_osds_info(node, ceph_extra_args, thcount=16).items():
104 ip = ip_remap[ip]
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200105 ips.add(ip)
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300106 # creds = ConnCreds(to_ip(cast(str, ip)), user="root", key=ssh_key)
107 creds = ConnCreds(to_ip(cast(str, ip)), user="root")
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200108 info = ctx.merge_node(creds, {'ceph-osd'})
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300109 info.params.setdefault('ceph-osds', []).extend(info.__dict__.copy() for info in osds_info)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200110 assert 'ceph' not in info.params or info.params['ceph'] == ceph_params
111 info.params['ceph'] = ceph_params
112
113 logger.debug("Found %s nodes with ceph-osd role", len(ips))
114 except Exception as exc:
kdanylov aka koder150b2192017-04-01 16:53:01 +0300115 if not ignore_errors:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200116 logger.exception("OSD discovery failed")
117 raise StopTestError()
118 else:
119 logger.warning("OSD discovery failed %s", exc)
120
121 try:
122 counter = 0
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +0300123 for counter, ip in enumerate(get_mons_ips(node, ceph_extra_args)):
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300124 ip = ip_remap[ip]
125 # creds = ConnCreds(to_ip(cast(str, ip)), user="root", key=ssh_key)
126 creds = ConnCreds(to_ip(cast(str, ip)), user="root")
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200127 info = ctx.merge_node(creds, {'ceph-mon'})
128 assert 'ceph' not in info.params or info.params['ceph'] == ceph_params
129 info.params['ceph'] = ceph_params
130 logger.debug("Found %s nodes with ceph-mon role", counter + 1)
131 except Exception as exc:
kdanylov aka koder150b2192017-04-01 16:53:01 +0300132 if not ignore_errors:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200133 logger.exception("MON discovery failed")
134 raise StopTestError()
135 else:
136 logger.warning("MON discovery failed %s", exc)
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300137
138
139def raw_dev_name(path):
140 if path.startswith("/dev/"):
141 path = path[5:]
142 while path and path[-1].isdigit():
143 path = path[:-1]
144 return path
145
146
147class FillCephInfoStage(Stage):
148 config_block = 'ceph'
149 priority = StepOrder.UPDATE_NODES_INFO
150
151 def run(self, ctx: TestRun) -> None:
152 for node in ctx.nodes:
153 if 'ceph_storage_devs' not in node.info.params:
154 if 'ceph-osd' in node.info.roles:
155 jdevs = set()
156 sdevs = set()
157 for osd_info in node.info.params['ceph-osds']:
158 for key, sset in [('journal', jdevs), ('storage', sdevs)]:
159 path = osd_info.get(key)
160 if path:
161 dpath = node.conn.fs.get_dev_for_file(path)
162 if isinstance(dpath, bytes):
163 dpath = dpath.decode('utf8')
164 sset.add(raw_dev_name(dpath))
165 node.info.params['ceph_storage_devs'] = list(sdevs)
166 node.info.params['ceph_journal_devs'] = list(jdevs)