blob: 0aca82e92164d97b47e7583abb8f31754a7eeff6 [file] [log] [blame]
import bz2
import time
import array
import logging
from typing import Dict, Tuple, Optional, Any
import numpy
from cephlib import sensors_rpc_plugin
from cephlib.units import b2ssize
from cephlib.wally_storage import WallyDB
from . import utils
from .test_run_class import TestRun
from .result_classes import DataSource
from .stage import Stage, StepOrder
plugin_fname = sensors_rpc_plugin.__file__.rsplit(".", 1)[0] + ".py"
SENSORS_PLUGIN_CODE = open(plugin_fname, "rb").read() # type: bytes
logger = logging.getLogger("wally")
# TODO(koder): in case if node has more than one role sensor settings might be incorrect
class StartSensorsStage(Stage):
priority = StepOrder.START_SENSORS
config_block = 'sensors'
def run(self, ctx: TestRun) -> None:
if array.array('L').itemsize != 8:
message = "Python array.array('L') items should be 8 bytes in size, not {}." + \
" Can't provide sensors on this platform. Disable sensors in config and retry"
logger.critical(message.format(array.array('L').itemsize))
raise utils.StopTestError()
# TODO: need carefully fix this
# sensors config is:
# role:
# sensor: [str]
# or
# role:
# sensor:
# allowed: [str]
# dissallowed: [str]
# params: Any
per_role_config = {} # type: Dict[str, Dict[str, str]]
for name, val in ctx.config.sensors.roles_mapping.raw().items():
if isinstance(val, str):
val = {vl.strip(): (".*" if vl.strip() != 'ceph' else {}) for vl in val.split(",")}
elif isinstance(val, list):
val = {vl: (".*" if vl != 'ceph' else {}) for vl in val}
per_role_config[name] = val
if 'all' in per_role_config:
all_vl = per_role_config.pop('all')
all_roles = set(per_role_config)
for node in ctx.nodes:
all_roles.update(node.info.roles) # type: ignore
for name, vals in list(per_role_config.items()):
new_vals = all_vl.copy()
new_vals.update(vals)
per_role_config[name] = new_vals
for node in ctx.nodes:
node_cfg = {} # type: Dict[str, Dict[str, str]]
for role in node.info.roles:
node_cfg.update(per_role_config.get(role, {})) # type: ignore
nid = node.node_id
if node_cfg:
# ceph requires additional settings
if 'ceph' in node_cfg:
node_cfg['ceph'].update(node.info.params['ceph'])
node_cfg['ceph']['osds'] = [osd['id'] for osd in node.info.params['ceph-osds']] # type: ignore
logger.debug("Setting up sensors RPC plugin for node %s", nid)
node.upload_plugin("sensors", SENSORS_PLUGIN_CODE)
ctx.sensors_run_on.add(nid)
logger.debug("Start monitoring node %s", nid)
node.conn.sensors.start(node_cfg)
else:
logger.debug("Skip monitoring node %s, as no sensors selected", nid)
def collect_sensors_data(ctx: TestRun,
stop: bool = False,
before_test: bool = False):
total_sz = 0
# ceph pg and pool data collected separatelly
cluster_metrics = getattr(ctx.config.sensors, 'cluster', [])
pgs_io = 'ceph-pgs-io' in cluster_metrics
pools_io = 'ceph-pools-io' in cluster_metrics
if pgs_io or pools_io:
assert ctx.ceph_master_node is not None
def collect() -> Tuple[Optional[Any], Optional[Any]]:
pg_dump = ctx.ceph_master_node.run(f"ceph {ctx.ceph_extra_args} pg dump --format json") if pgs_io else None
pools_dump = ctx.ceph_master_node.run(f"rados {ctx.ceph_extra_args} df --format json") if pools_io else None
return pg_dump, pools_dump
future = ctx.get_pool().submit(collect)
else:
future = None
ctime = int(time.time())
if not before_test:
logger.info("Start loading sensors")
for node in ctx.nodes:
node_id = node.node_id
if node_id in ctx.sensors_run_on:
func = node.conn.sensors.stop if stop else node.conn.sensors.get_updates
# hack to calculate total transferred size
offset_map, compressed_blob, compressed_collected_at_b = func()
data_tpl = (offset_map, compressed_blob, compressed_collected_at_b)
total_sz += len(compressed_blob) + len(compressed_collected_at_b) + sum(map(len, offset_map)) + \
16 * len(offset_map)
for path, value, is_array, units in sensors_rpc_plugin.unpack_rpc_updates(data_tpl):
if path == 'collected_at':
ds = DataSource(node_id=node_id, metric='collected_at', tag='csv')
ctx.rstorage.append_sensor(numpy.array(value), ds, units)
else:
sensor, dev, metric = path.split(".")
ds = DataSource(node_id=node_id, metric=metric, dev=dev, sensor=sensor, tag='csv')
if is_array:
ctx.rstorage.append_sensor(numpy.array(value), ds, units)
else:
if metric == 'historic':
value = bz2.compress(value)
tag = 'bz2'
else:
assert metric == 'perf_dump'
tag = 'txt'
ctx.storage.put_raw(value, WallyDB.ceph_metric(node_id=node_id,
metric=metric,
time=ctime,
tag=tag))
if future:
pgs_info, pools_info = future.result()
if pgs_info:
total_sz += len(pgs_info)
ctx.storage.put_raw(bz2.compress(pgs_info.encode('utf8')), WallyDB.pgs_io.format(time=ctime))
if pools_info:
total_sz += len(pools_info)
ctx.storage.put_raw(bz2.compress(pools_info.encode('utf8')), WallyDB.pools_io.format(time=ctime))
logger.info("Download %sB of sensors data", b2ssize(total_sz))
class CollectSensorsStage(Stage):
priority = StepOrder.COLLECT_SENSORS
config_block = 'sensors'
def run(self, ctx: TestRun) -> None:
collect_sensors_data(ctx, True, False)