a lot of fixes, improve visualization speed, add c++ code
diff --git a/wally/ceph.py b/wally/ceph.py
index 36d6b15..3527db3 100644
--- a/wally/ceph.py
+++ b/wally/ceph.py
@@ -1,7 +1,7 @@
""" Collect data about ceph nodes"""
-import json
+import os
import logging
-from typing import Dict, cast, List, Set, Optional
+from typing import Dict, cast, List, Set
from .node_interfaces import NodeInfo, IRPCNode
@@ -21,16 +21,16 @@
logger = logging.getLogger("wally")
-def get_osds_info(node: IRPCNode, ceph_extra_args: str = "") -> Dict[IP, List[OSDInfo]]:
+def get_osds_info(node: IRPCNode, ceph_extra_args: str = "", thcount: int = 8) -> Dict[IP, List[OSDInfo]]:
"""Get set of osd's ip"""
res = {} # type: Dict[IP, List[OSDInfo]]
return {IP(ip): osd_info_list
- for ip, osd_info_list in discover.get_osds_nodes(node.run, ceph_extra_args)}
+ for ip, osd_info_list in discover.get_osds_nodes(node.run, ceph_extra_args, thcount=thcount).items()}
def get_mons_ips(node: IRPCNode, ceph_extra_args: str = "") -> Set[IP]:
"""Return mon ip set"""
- return {IP(ip) for ip in discover.get_mons_nodes(node.run, ceph_extra_args).values()}
+ return {IP(ip) for ip, _ in discover.get_mons_nodes(node.run, ceph_extra_args).values()}
class DiscoverCephStage(Stage):
@@ -40,7 +40,8 @@
def run(self, ctx: TestRun) -> None:
"""Return list of ceph's nodes NodeInfo"""
- if 'ceph' not in ctx.config.discovery:
+ if 'ceph' not in ctx.config.discover:
+ print(ctx.config.discover)
logger.debug("Skip ceph discovery due to config setting")
return
@@ -48,11 +49,11 @@
logger.debug("Skip ceph discovery, use previously discovered nodes")
return
- if 'metadata' in ctx.config.discovery:
+ if 'metadata' in ctx.config.discover:
logger.exception("Ceph metadata discovery is not implemented")
raise StopTestError()
- ignore_errors = 'ignore_errors' in ctx.config.discovery
+ ignore_errors = 'ignore_errors' in ctx.config.discover
ceph = ctx.config.ceph
root_node_uri = cast(str, ceph.root_node)
cluster = ceph.get("cluster", "ceph")
@@ -81,18 +82,31 @@
ceph_params = {"cluster": cluster, "conf": conf, "key": key}
+ ip_remap = {
+ '10.8.0.4': '172.16.164.71',
+ '10.8.0.3': '172.16.164.72',
+ '10.8.0.2': '172.16.164.73',
+ '10.8.0.5': '172.16.164.74',
+ '10.8.0.6': '172.16.164.75',
+ '10.8.0.7': '172.16.164.76',
+ '10.8.0.8': '172.16.164.77',
+ '10.8.0.9': '172.16.164.78',
+ }
+
with setup_rpc(connect(info), ctx.rpc_code, ctx.default_rpc_plugins,
log_level=ctx.config.rpc_log_level) as node:
- ssh_key = node.get_file_content("~/.ssh/id_rsa")
+ # ssh_key = node.get_file_content("~/.ssh/id_rsa", expanduser=True)
try:
ips = set()
- for ip, osds_info in get_osds_info(node, ceph_extra_args).items():
+ for ip, osds_info in get_osds_info(node, ceph_extra_args, thcount=16).items():
+ ip = ip_remap[ip]
ips.add(ip)
- creds = ConnCreds(to_ip(cast(str, ip)), user="root", key=ssh_key)
+ # creds = ConnCreds(to_ip(cast(str, ip)), user="root", key=ssh_key)
+ creds = ConnCreds(to_ip(cast(str, ip)), user="root")
info = ctx.merge_node(creds, {'ceph-osd'})
- info.params.setdefault('ceph-osds', []).extend(osds_info)
+ info.params.setdefault('ceph-osds', []).extend(info.__dict__.copy() for info in osds_info)
assert 'ceph' not in info.params or info.params['ceph'] == ceph_params
info.params['ceph'] = ceph_params
@@ -107,7 +121,9 @@
try:
counter = 0
for counter, ip in enumerate(get_mons_ips(node, ceph_extra_args)):
- creds = ConnCreds(to_ip(cast(str, ip)), user="root", key=ssh_key)
+ ip = ip_remap[ip]
+ # creds = ConnCreds(to_ip(cast(str, ip)), user="root", key=ssh_key)
+ creds = ConnCreds(to_ip(cast(str, ip)), user="root")
info = ctx.merge_node(creds, {'ceph-mon'})
assert 'ceph' not in info.params or info.params['ceph'] == ceph_params
info.params['ceph'] = ceph_params
@@ -118,3 +134,33 @@
raise StopTestError()
else:
logger.warning("MON discovery failed %s", exc)
+
+
+def raw_dev_name(path):
+ if path.startswith("/dev/"):
+ path = path[5:]
+ while path and path[-1].isdigit():
+ path = path[:-1]
+ return path
+
+
+class FillCephInfoStage(Stage):
+ config_block = 'ceph'
+ priority = StepOrder.UPDATE_NODES_INFO
+
+ def run(self, ctx: TestRun) -> None:
+ for node in ctx.nodes:
+ if 'ceph_storage_devs' not in node.info.params:
+ if 'ceph-osd' in node.info.roles:
+ jdevs = set()
+ sdevs = set()
+ for osd_info in node.info.params['ceph-osds']:
+ for key, sset in [('journal', jdevs), ('storage', sdevs)]:
+ path = osd_info.get(key)
+ if path:
+ dpath = node.conn.fs.get_dev_for_file(path)
+ if isinstance(dpath, bytes):
+ dpath = dpath.decode('utf8')
+ sset.add(raw_dev_name(dpath))
+ node.info.params['ceph_storage_devs'] = list(sdevs)
+ node.info.params['ceph_journal_devs'] = list(jdevs)