fixes, fixes
diff --git a/wally/sensors.py b/wally/sensors.py
index 60830a1..89bc224 100644
--- a/wally/sensors.py
+++ b/wally/sensors.py
@@ -1,7 +1,7 @@
import bz2
import array
import logging
-from typing import List, Dict, Tuple
+from typing import Dict
import numpy
@@ -21,37 +21,6 @@
logger = logging.getLogger("wally")
-sensor_units = {
- "system-cpu.idle": "",
- "system-cpu.nice": "",
- "system-cpu.user": "",
- "system-cpu.sys": "",
- "system-cpu.iowait": "",
- "system-cpu.irq": "",
- "system-cpu.sirq": "",
- "system-cpu.steal": "",
- "system-cpu.guest": "",
-
- "system-cpu.procs_blocked": "",
- "system-cpu.procs_queue_x10": "",
-
- "net-io.recv_bytes": "B",
- "net-io.recv_packets": "",
- "net-io.send_bytes": "B",
- "net-io.send_packets": "",
-
- "block-io.io_queue": "",
- "block-io.io_time": "ms",
- "block-io.reads_completed": "",
- "block-io.rtime": "ms",
- "block-io.sectors_read": "B",
- "block-io.sectors_written": "B",
- "block-io.writes_completed": "",
- "block-io.wtime": "ms",
- "block-io.weighted_io_time": "ms"
-}
-
-
# TODO(koder): in case if node has more than one role sensor settings might be incorrect
class StartSensorsStage(Stage):
priority = StepOrder.START_SENSORS
@@ -118,27 +87,29 @@
def collect_sensors_data(ctx: TestRun, stop: bool = False):
rstorage = ResultStorage(ctx.storage)
+ total_sz = 0
+
+ logger.info("Start loading sensors")
for node in ctx.nodes:
node_id = node.node_id
if node_id in ctx.sensors_run_on:
+ func = node.conn.sensors.stop if stop else node.conn.sensors.get_updates
- if stop:
- func = node.conn.sensors.stop
- else:
- func = node.conn.sensors.get_updates
+ # hack to calculate total transferred size
+ offset_map, compressed_blob, compressed_collected_at_b = func()
+ data_tpl = (offset_map, compressed_blob, compressed_collected_at_b)
- # TODO: units should came along with data
- # TODO: process raw sensors data
+ total_sz += len(compressed_blob) + len(compressed_collected_at_b) + sum(map(len, offset_map)) + \
+ 16 * len(offset_map)
- for path, value, is_array in sensors_rpc_plugin.unpack_rpc_updates(func()):
+ for path, value, is_array, units in sensors_rpc_plugin.unpack_rpc_updates(data_tpl):
if path == 'collected_at':
ds = DataSource(node_id=node_id, metric='collected_at', tag='csv')
- rstorage.append_sensor(numpy.array(value), ds, 'us')
+ rstorage.append_sensor(numpy.array(value), ds, units)
else:
sensor, dev, metric = path.split(".")
ds = DataSource(node_id=node_id, metric=metric, dev=dev, sensor=sensor, tag='csv')
if is_array:
- units = sensor_units["{}.{}".format(sensor, metric)]
rstorage.append_sensor(numpy.array(value), ds, units)
else:
if metric == 'historic':
@@ -146,6 +117,7 @@
else:
assert metric in ('perf_dump', 'historic_js')
rstorage.put_sensor_raw(value, ds(tag='js'))
+ logger.info("Download %sB of sensors data", utils.b2ssize(total_sz))
@@ -156,33 +128,3 @@
def run(self, ctx: TestRun) -> None:
collect_sensors_data(ctx, True)
-
-# def delta(func, only_upd=True):
-# prev = {}
-# while True:
-# for dev_name, vals in func():
-# if dev_name not in prev:
-# prev[dev_name] = {}
-# for name, (val, _) in vals.items():
-# prev[dev_name][name] = val
-# else:
-# dev_prev = prev[dev_name]
-# res = {}
-# for stat_name, (val, accum_val) in vals.items():
-# if accum_val:
-# if stat_name in dev_prev:
-# delta = int(val) - int(dev_prev[stat_name])
-# if not only_upd or 0 != delta:
-# res[stat_name] = str(delta)
-# dev_prev[stat_name] = val
-# elif not only_upd or '0' != val:
-# res[stat_name] = val
-#
-# if only_upd and len(res) == 0:
-# continue
-# yield dev_name, res
-# yield None, None
-#
-#
-
-