moving code to cephlib
diff --git a/wally/ceph.py b/wally/ceph.py
index a374ed6..9734baa 100644
--- a/wally/ceph.py
+++ b/wally/ceph.py
@@ -14,83 +14,23 @@
from .utils import StopTestError, to_ip
+from cephlib import discover
+from cephlib.discover import OSDInfo
+
+
logger = logging.getLogger("wally")
-class OSDInfo:
- def __init__(self, id: int, journal: str = None, storage: str = None) -> None:
- self.id = id
- self.journal = journal
- self.storage = storage
-
-
-def get_osds_info(node: IRPCNode, conf: str, key: str) -> Dict[IP, List[OSDInfo]]:
+def get_osds_info(node: IRPCNode, ceph_extra_args: str = "") -> Dict[IP, List[OSDInfo]]:
"""Get set of osd's ip"""
-
- data = node.run("ceph -c {} -k {} --format json osd dump".format(conf, key))
- try:
- jdata = json.loads(data)
- except:
- open("/tmp/ceph-out.json", "w").write(data)
- raise
- ips = {} # type: Dict[IP, List[OSDInfo]]
- first_error = True
- for osd_data in jdata["osds"]:
- osd_id = int(osd_data["osd"])
- if "public_addr" not in osd_data:
- if first_error:
- logger.warning("No 'public_addr' field in 'ceph osd dump' output for osd %s" +
- "(all subsequent errors omitted)", osd_id)
- first_error = False
- else:
- ip_port = osd_data["public_addr"]
- if '/' in ip_port:
- ip_port = ip_port.split("/", 1)[0]
- ip = IP(ip_port.split(":")[0])
-
- osd_journal_path = None # type: Optional[str]
- osd_data_path = None # type: Optional[str]
-
- # TODO: parallelize this!
- osd_cfg = node.run("ceph -n osd.{} --show-config".format(osd_id))
- for line in osd_cfg.split("\n"):
- if line.startswith("osd_journal ="):
- osd_journal_path = line.split("=")[1].strip()
- elif line.startswith("osd_data ="):
- osd_data_path = line.split("=")[1].strip()
-
- if osd_data_path is None or osd_journal_path is None:
- open("/tmp/ceph-out.json", "w").write(osd_cfg)
- logger.error("Can't detect osd %s journal or storage path", osd_id)
- raise StopTestError()
-
- ips.setdefault(ip, []).append(OSDInfo(osd_id,
- journal=osd_journal_path,
- storage=osd_data_path))
- return ips
+ res = {} # type: Dict[IP, List[OSDInfo]]
+ return {IP(ip): osd_info_list
+ for ip, osd_info_list in discover.get_osds_nodes(node.run, ceph_extra_args)}
-def get_mons_ips(node: IRPCNode, conf: str, key: str) -> Set[IP]:
+def get_mons_ips(node: IRPCNode, ceph_extra_args: str = "") -> Set[IP]:
"""Return mon ip set"""
-
- data = node.run("ceph -c {} -k {} --format json mon_status".format(conf, key))
- jdata = json.loads(data)
- ips = set() # type: Set[IP]
- first_error = True
- for mon_data in jdata["monmap"]["mons"]:
- if "addr" not in mon_data:
- if first_error:
- mon_name = mon_data.get("name", "<MON_NAME_MISSED>")
- logger.warning("No 'addr' field in 'ceph mon_status' output for mon %s" +
- "(all subsequent errors omitted)", mon_name)
- first_error = False
- else:
- ip_port = mon_data["addr"]
- if '/' in ip_port:
- ip_port = ip_port.split("/", 1)[0]
- ips.add(IP(ip_port.split(":")[0]))
-
- return ips
+ return {IP(ip) for ip in discover.get_mons_nodes(node.run, ceph_extra_args).values()}
class DiscoverCephStage(Stage):
@@ -112,33 +52,39 @@
ceph = ctx.config.ceph
root_node_uri = cast(str, ceph.root_node)
cluster = ceph.get("cluster", "ceph")
+
conf = ceph.get("conf")
key = ceph.get("key")
- logger.debug("Start discovering ceph nodes from root %s", root_node_uri)
- logger.debug("cluster=%s key=%s conf=%s", cluster, conf, key)
-
- info = NodeInfo(parse_ssh_uri(root_node_uri), set())
-
if conf is None:
conf = "/etc/ceph/{}.conf".format(cluster)
if key is None:
key = "/etc/ceph/{}.client.admin.keyring".format(cluster)
+ ceph_extra_args = ""
+
+ if conf:
+ ceph_extra_args += " -c '{}'".format(conf)
+
+ if key:
+ ceph_extra_args += " -k '{}'".format(key)
+
+ logger.debug("Start discovering ceph nodes from root %s", root_node_uri)
+ logger.debug("cluster=%s key=%s conf=%s", cluster, conf, key)
+
+ info = NodeInfo(parse_ssh_uri(root_node_uri), set())
+
ceph_params = {"cluster": cluster, "conf": conf, "key": key}
- with setup_rpc(connect(info),
- ctx.rpc_code,
- ctx.default_rpc_plugins,
+ with setup_rpc(connect(info), ctx.rpc_code, ctx.default_rpc_plugins,
log_level=ctx.config.rpc_log_level) as node:
ssh_key = node.get_file_content("~/.ssh/id_rsa")
-
try:
ips = set()
- for ip, osds_info in get_osds_info(node, conf, key).items():
+ for ip, osds_info in get_osds_info(node, ceph_extra_args).items():
ips.add(ip)
creds = ConnCreds(to_ip(cast(str, ip)), user="root", key=ssh_key)
info = ctx.merge_node(creds, {'ceph-osd'})
@@ -156,7 +102,7 @@
try:
counter = 0
- for counter, ip in enumerate(get_mons_ips(node, conf, key)):
+ for counter, ip in enumerate(get_mons_ips(node, ceph_extra_args)):
creds = ConnCreds(to_ip(cast(str, ip)), user="root", key=ssh_key)
info = ctx.merge_node(creds, {'ceph-mon'})
assert 'ceph' not in info.params or info.params['ceph'] == ceph_params