a lot of changes
diff --git a/wally/sensors_utils.py b/wally/sensors_utils.py
index 20135b6..81a2832 100644
--- a/wally/sensors_utils.py
+++ b/wally/sensors_utils.py
@@ -26,21 +26,19 @@
break
addr, data = val
-
if addr not in observed_nodes:
mon_q.put(addr + (data['source_id'],))
observed_nodes.add(addr)
fd.write(repr((addr, data)) + "\n")
- source_id = data.pop('source_id')
- rep_time = data.pop('time')
-
- if 'testnode' in source2roles_map.get(source_id, []):
- sum_io_q = 0
- data_store.update_values(rep_time,
- {"testnodes:io": sum_io_q},
- add=True)
+ # source_id = data.pop('source_id')
+ # rep_time = data.pop('time')
+ # if 'testnode' in source2roles_map.get(source_id, []):
+ # sum_io_q = 0
+ # data_store.update_values(rep_time,
+ # {"testnodes:io": sum_io_q},
+ # add=True)
except Exception:
logger.exception("Error in sensors thread")
logger.info("Sensors thread exits")
@@ -62,8 +60,8 @@
for node in nodes:
if role in node.roles:
- if node.monitor_url is not None:
- monitor_url = node.monitor_url
+ if node.monitor_ip is not None:
+ monitor_url = receiver_url.format(ip=node.monitor_ip)
else:
ip = node.get_ip()
ext_ip = utils.get_ip_for_target(ip)
@@ -104,7 +102,8 @@
return mon_q
-def deploy_sensors_stage(cfg, ctx, nodes=None, undeploy=True):
+def deploy_sensors_stage(cfg, ctx, nodes=None, undeploy=True,
+ recv_timeout=10, ignore_nodata=False):
if 'sensors' not in cfg:
return
@@ -134,18 +133,22 @@
ctx.clear_calls_stack.append(remove_sensors_stage)
- logger.info("Deploing new sensors on {0} node(s)".format(len(nodes)))
+ num_monitoref_nodes = len(sensors_configs)
+ logger.info("Deploing new sensors on {0} node(s)".format(
+ num_monitoref_nodes))
+
deploy_and_start_sensors(sensors_configs)
- wait_for_new_sensors_data(ctx, monitored_nodes)
+ wait_for_new_sensors_data(ctx, monitored_nodes, recv_timeout,
+ ignore_nodata)
-def wait_for_new_sensors_data(ctx, monitored_nodes):
- MAX_WAIT_FOR_SENSORS = 10
- etime = time.time() + MAX_WAIT_FOR_SENSORS
+def wait_for_new_sensors_data(ctx, monitored_nodes, recv_timeout,
+ ignore_nodata):
+ etime = time.time() + recv_timeout
msg = "Waiting at most {0}s till all {1} nodes starts report sensors data"
nodes_ids = set(node.get_conn_id() for node in monitored_nodes)
- logger.debug(msg.format(MAX_WAIT_FOR_SENSORS, len(nodes_ids)))
+ logger.debug(msg.format(recv_timeout, len(nodes_ids)))
# wait till all nodes start sending data
while len(nodes_ids) != 0:
@@ -153,9 +156,12 @@
try:
source_id = ctx.sensors_mon_q.get(True, tleft)[2]
except Queue.Empty:
- msg = "Node {0} not sending any sensor data in {1}s"
- msg = msg.format(", ".join(nodes_ids), MAX_WAIT_FOR_SENSORS)
- raise RuntimeError(msg)
+ if not ignore_nodata:
+ msg = "Node(s) {0} not sending any sensor data in {1}s"
+ msg = msg.format(", ".join(nodes_ids), recv_timeout)
+ raise RuntimeError(msg)
+ else:
+ return
if source_id not in nodes_ids:
msg = "Receive sensors from extra node: {0}".format(source_id)