blob: 4f00a3e063abad66ccba78c6907b03f65c66f69a [file] [log] [blame]
koder aka kdanilov962ee5f2016-12-19 02:40:08 +02001import array
2import logging
3from typing import List, Dict, Tuple
koder aka kdanilov39e449e2016-12-17 15:15:26 +02004
koder aka kdanilov962ee5f2016-12-19 02:40:08 +02005from . import utils
koder aka kdanilov70227062016-11-26 23:23:21 +02006from .test_run_class import TestRun
7from . import sensors_rpc_plugin
koder aka kdanilov39e449e2016-12-17 15:15:26 +02008from .stage import Stage, StepOrder
koder aka kdanilov70227062016-11-26 23:23:21 +02009
10plugin_fname = sensors_rpc_plugin.__file__.rsplit(".", 1)[0] + ".py"
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020011SENSORS_PLUGIN_CODE = open(plugin_fname, "rb").read()
koder aka kdanilov70227062016-11-26 23:23:21 +020012
13
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020014logger = logging.getLogger("wally")
koder aka kdanilov70227062016-11-26 23:23:21 +020015
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020016
17# TODO(koder): in case if node has more than one role sensor settings might be incorrect
koder aka kdanilov39e449e2016-12-17 15:15:26 +020018class StartSensorsStage(Stage):
19 priority = StepOrder.START_SENSORS
20 config_block = 'sensors'
koder aka kdanilov70227062016-11-26 23:23:21 +020021
koder aka kdanilov39e449e2016-12-17 15:15:26 +020022 def run(self, ctx: TestRun) -> None:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020023 if array.array('L').itemsize != 8:
24 message = "Python array.array('L') items should be 8 bytes in size, not {}." + \
25 " Can't provide sensors on this platform. Disable sensors in config and retry"
26 logger.critical(message.format(array.array('L').itemsize))
27 raise utils.StopTestError()
koder aka kdanilov39e449e2016-12-17 15:15:26 +020028
29 per_role_config = {} # type: Dict[str, Dict[str, str]]
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020030
31 for name, val in ctx.config.sensors.roles_mapping.raw().items():
koder aka kdanilov39e449e2016-12-17 15:15:26 +020032 if isinstance(val, str):
33 val = {vl.strip(): ".*" for vl in val.split(",")}
34 elif isinstance(val, list):
35 val = {vl: ".*" for vl in val}
36 per_role_config[name] = val
37
38 if 'all' in per_role_config:
39 all_vl = per_role_config.pop('all')
40 all_roles = set(per_role_config)
41
42 for node in ctx.nodes:
43 all_roles.update(node.info.roles)
44
45 for name, vals in list(per_role_config.items()):
46 new_vals = all_vl.copy()
47 new_vals.update(vals)
48 per_role_config[name] = new_vals
koder aka kdanilov70227062016-11-26 23:23:21 +020049
50 for node in ctx.nodes:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020051 node_cfg = {} # type: Dict[str, Dict[str, str]]
koder aka kdanilov39e449e2016-12-17 15:15:26 +020052 for role in node.info.roles:
53 node_cfg.update(per_role_config.get(role, {}))
koder aka kdanilov70227062016-11-26 23:23:21 +020054
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020055 nid = node.info.node_id()
koder aka kdanilov39e449e2016-12-17 15:15:26 +020056 if node_cfg:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020057 # ceph requires additional settings
58 if 'ceph' in node_cfg:
59 node_cfg['ceph'].update(node.info.params['ceph'])
60 node_cfg['ceph']['osds'] = [osd.id for osd in node.info.params['ceph-osds']]
61
62 logger.debug("Setting up sensort RPC plugin for node %s", nid)
63 node.upload_plugin("sensors", SENSORS_PLUGIN_CODE)
64 ctx.sensors_run_on.add(nid)
65 logger.debug("Start monitoring node %s", nid)
66 node.conn.sensors.start(node_cfg)
67 else:
68 logger.debug("Skip monitoring node %s, as no sensors selected", nid)
69
70
71def collect_sensors_data(ctx: TestRun, stop: bool = False):
72 for node in ctx.nodes:
73 node_id = node.info.node_id()
74 if node_id in ctx.sensors_run_on:
75
76 if stop:
77 func = node.conn.sensors.stop
78 else:
79 func = node.conn.sensors.get_updates
80
81 data, collected_at_b = func() # type: Dict[Tuple[bytes, bytes], List[int]], List[float]
82
83 collected_at = array.array('f')
84 collected_at.frombytes(collected_at_b)
85
86 mstore = ctx.storage.sub_storage("metric", node_id)
87 for (source_name, sensor_name), values_b in data.items():
88 values = array.array('Q')
89 values.frombytes(values_b)
90 mstore.append(values, source_name, sensor_name.decode("utf8"))
91 mstore.append(collected_at, "collected_at")
koder aka kdanilov70227062016-11-26 23:23:21 +020092
93
koder aka kdanilov39e449e2016-12-17 15:15:26 +020094class CollectSensorsStage(Stage):
95 priority = StepOrder.COLLECT_SENSORS
96 config_block = 'sensors'
koder aka kdanilov70227062016-11-26 23:23:21 +020097
koder aka kdanilov39e449e2016-12-17 15:15:26 +020098 def run(self, ctx: TestRun) -> None:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020099 collect_sensors_data(ctx, True)
koder aka kdanilov70227062016-11-26 23:23:21 +0200100
101
102# def delta(func, only_upd=True):
103# prev = {}
104# while True:
105# for dev_name, vals in func():
106# if dev_name not in prev:
107# prev[dev_name] = {}
108# for name, (val, _) in vals.items():
109# prev[dev_name][name] = val
110# else:
111# dev_prev = prev[dev_name]
112# res = {}
113# for stat_name, (val, accum_val) in vals.items():
114# if accum_val:
115# if stat_name in dev_prev:
116# delta = int(val) - int(dev_prev[stat_name])
117# if not only_upd or 0 != delta:
118# res[stat_name] = str(delta)
119# dev_prev[stat_name] = val
120# elif not only_upd or '0' != val:
121# res[stat_name] = val
122#
123# if only_upd and len(res) == 0:
124# continue
125# yield dev_name, res
126# yield None, None
127#
128#
129
130