Skeleton and sensors works
diff --git a/wally/sensors.py b/wally/sensors.py
index c86aeb4..4f00a3e 100644
--- a/wally/sensors.py
+++ b/wally/sensors.py
@@ -1,25 +1,34 @@
-from typing import List, Dict, Tuple, Any
+import array
+import logging
+from typing import List, Dict, Tuple
+from . import utils
from .test_run_class import TestRun
from . import sensors_rpc_plugin
from .stage import Stage, StepOrder
plugin_fname = sensors_rpc_plugin.__file__.rsplit(".", 1)[0] + ".py"
-SENSORS_PLUGIN_CODE = open(plugin_fname).read()
+SENSORS_PLUGIN_CODE = open(plugin_fname, "rb").read()
-# TODO(koder): in case if node has more than one role sensor settigns might be incorrect
+logger = logging.getLogger("wally")
+
+# TODO(koder): in case if node has more than one role sensor settings might be incorrect
class StartSensorsStage(Stage):
priority = StepOrder.START_SENSORS
config_block = 'sensors'
def run(self, ctx: TestRun) -> None:
- if 'sensors' not in ctx.config:
- return
+ if array.array('L').itemsize != 8:
+ message = "Python array.array('L') items should be 8 bytes in size, not {}." + \
+ " Can't provide sensors on this platform. Disable sensors in config and retry"
+ logger.critical(message.format(array.array('L').itemsize))
+ raise utils.StopTestError()
per_role_config = {} # type: Dict[str, Dict[str, str]]
- for name, val in ctx.config['sensors'].copy():
+
+ for name, val in ctx.config.sensors.roles_mapping.raw().items():
if isinstance(val, str):
val = {vl.strip(): ".*" for vl in val.split(",")}
elif isinstance(val, list):
@@ -39,14 +48,47 @@
per_role_config[name] = new_vals
for node in ctx.nodes:
- node_cfg = {} # type: Dict[str, str]
+ node_cfg = {} # type: Dict[str, Dict[str, str]]
for role in node.info.roles:
node_cfg.update(per_role_config.get(role, {}))
+ nid = node.info.node_id()
if node_cfg:
- node.conn.upload_plugin(SENSORS_PLUGIN_CODE)
- ctx.sensors_run_on.add(node.info.node_id())
- node.conn.sensors.start()
+ # ceph requires additional settings
+ if 'ceph' in node_cfg:
+ node_cfg['ceph'].update(node.info.params['ceph'])
+ node_cfg['ceph']['osds'] = [osd.id for osd in node.info.params['ceph-osds']]
+
+ logger.debug("Setting up sensort RPC plugin for node %s", nid)
+ node.upload_plugin("sensors", SENSORS_PLUGIN_CODE)
+ ctx.sensors_run_on.add(nid)
+ logger.debug("Start monitoring node %s", nid)
+ node.conn.sensors.start(node_cfg)
+ else:
+ logger.debug("Skip monitoring node %s, as no sensors selected", nid)
+
+
+def collect_sensors_data(ctx: TestRun, stop: bool = False):
+ for node in ctx.nodes:
+ node_id = node.info.node_id()
+ if node_id in ctx.sensors_run_on:
+
+ if stop:
+ func = node.conn.sensors.stop
+ else:
+ func = node.conn.sensors.get_updates
+
+ data, collected_at_b = func() # type: Dict[Tuple[bytes, bytes], List[int]], List[float]
+
+ collected_at = array.array('f')
+ collected_at.frombytes(collected_at_b)
+
+ mstore = ctx.storage.sub_storage("metric", node_id)
+ for (source_name, sensor_name), values_b in data.items():
+ values = array.array('Q')
+ values.frombytes(values_b)
+ mstore.append(values, source_name, sensor_name.decode("utf8"))
+ mstore.append(collected_at, "collected_at")
class CollectSensorsStage(Stage):
@@ -54,16 +96,7 @@
config_block = 'sensors'
def run(self, ctx: TestRun) -> None:
- for node in ctx.nodes:
- node_id = node.info.node_id()
- if node_id in ctx.sensors_run_on:
-
- data, collected_at = node.conn.sensors.stop() # type: Dict[Tuple[str, str], List[int]], List[float]
-
- mstore = ctx.storage.sub_storage("metric", node_id)
- for (source_name, sensor_name), values in data.items():
- mstore[source_name, sensor_name] = values
- mstore["collected_at"] = collected_at
+ collect_sensors_data(ctx, True)
# def delta(func, only_upd=True):