blob: e5b434711f14faa3a12bd2c7f3aa155b60a9c2b0 [file] [log] [blame]
import time
import array
import Queue
import logging
import threading
from wally import utils
from wally.config import cfg_dict
from wally.sensors.api import (start_listener_thread,
deploy_and_start_sensors,
SensorConfig,
stop_and_remove_sensors)
logger = logging.getLogger("wally.sensors")
DEFAULT_RECEIVER_URL = "udp://{ip}:5699"
class SensorDatastore(object):
def __init__(self, stime=None):
self.lock = threading.Lock()
self.stime = stime
self.min_size = 60 * 60
self.max_size = 60 * 61
self.data = {
'testnodes:io': array.array("B"),
'testnodes:cpu': array.array("B"),
}
def get_values(self, name, start, end):
assert end >= start
if end == start:
return []
with self.lock:
curr_arr = self.data[name]
if self.stime is None:
return []
sidx = start - self.stime
eidx = end - self.stime
if sidx < 0 and eidx < 0:
return [0] * (end - start)
elif sidx < 0:
return [0] * (-sidx) + curr_arr[:eidx]
return curr_arr[sidx:eidx]
def set_values(self, start_time, vals):
with self.lock:
return self.add_values_l(start_time, vals)
def set_values_l(self, start_time, vals):
max_cut = 0
for name, values in vals.items():
curr_arr = self.data.setdefault(name, array.array("H"))
if self.stime is None:
self.stime = start_time
curr_end_time = len(curr_arr) + self.stime
if curr_end_time < start_time:
curr_arr.extend([0] * (start_time - curr_end_time))
curr_arr.extend(values)
elif curr_end_time > start_time:
logger.warning("Duplicated sensors data")
sindex = len(curr_arr) + (start_time - curr_end_time)
if sindex < 0:
values = values[-sindex:]
sindex = 0
logger.warning("Some data with timestamp before"
" beginning of measurememts. Skip them")
curr_arr[sindex:sindex + len(values)] = values
else:
curr_arr.extend(values)
if len(curr_arr) > self.max_size:
max_cut = max(len(curr_arr) - self.min_size, max_cut)
if max_cut > 0:
self.start_time += max_cut
for val in vals.values():
del val[:max_cut]
def save_sensors_data(data_q, mon_q, fd, data_store, source2roles_map):
fd.write("\n")
BUFFER = 3
observed_nodes = set()
testnodes_data = {
'io': {},
'cpu': {},
}
try:
while True:
val = data_q.get()
if val is None:
break
addr, data = val
if addr not in observed_nodes:
mon_q.put(addr + (data['source_id'],))
observed_nodes.add(addr)
fd.write(repr((addr, data)) + "\n")
source_id = data.pop('source_id')
rep_time = data.pop('time')
if 'testnode' in source2roles_map.get(source_id, []):
vl = testnodes_data['io'].get(rep_time, 0)
sum_io_q = vl
testnodes_data['io'][rep_time] = sum_io_q
etime = time.time() - BUFFER
for name, vals in testnodes_data.items():
new_vals = {}
for rtime, value in vals.items():
if rtime < etime:
data_store.set_values("testnodes:io", rtime, [value])
else:
new_vals[rtime] = value
vals.clear()
vals.update(new_vals)
except Exception:
logger.exception("Error in sensors thread")
logger.info("Sensors thread exits")
def get_sensors_config_for_nodes(cfg, nodes):
monitored_nodes = []
sensors_configs = []
source2roles_map = {}
receiver_url = cfg.get("receiver_url", DEFAULT_RECEIVER_URL)
assert '{ip}' in receiver_url
for role, sensors_str in cfg["roles_mapping"].items():
sensors = [sens.strip() for sens in sensors_str.split(",")]
collect_cfg = dict((sensor, {}) for sensor in sensors)
for node in nodes:
if role in node.roles:
if node.monitor_url is not None:
monitor_url = node.monitor_url
else:
ip = node.get_ip()
if ip == '127.0.0.1':
ext_ip = '127.0.0.1'
else:
ext_ip = utils.get_ip_for_target(ip)
monitor_url = receiver_url.format(ip=ext_ip)
source2roles_map[node.get_conn_id()] = node.roles
monitored_nodes.append(node)
sens_cfg = SensorConfig(node.connection,
node.get_conn_id(),
collect_cfg,
source_id=node.get_conn_id(),
monitor_url=monitor_url)
sensors_configs.append(sens_cfg)
return monitored_nodes, sensors_configs, source2roles_map
def start_sensor_process_thread(ctx, cfg, sensors_configs, source2roles_map):
receiver_url = cfg.get('receiver_url', DEFAULT_RECEIVER_URL)
sensors_data_q, stop_sensors_loop = \
start_listener_thread(receiver_url.format(ip='0.0.0.0'))
mon_q = Queue.Queue()
fd = open(cfg_dict['sensor_storage'], "w")
params = sensors_data_q, mon_q, fd, ctx.sensors_data, source2roles_map
sensor_listen_th = threading.Thread(None, save_sensors_data, None,
params)
sensor_listen_th.daemon = True
sensor_listen_th.start()
def stop_sensors_receiver(cfg, ctx):
stop_sensors_loop()
sensors_data_q.put(None)
sensor_listen_th.join()
ctx.clear_calls_stack.append(stop_sensors_receiver)
return mon_q
def deploy_sensors_stage(cfg, ctx, nodes=None, undeploy=True):
if 'sensors' not in cfg:
return
cfg = cfg.get('sensors')
if nodes is None:
nodes = ctx.nodes
monitored_nodes, sensors_configs, source2roles_map = \
get_sensors_config_for_nodes(cfg, nodes)
if len(monitored_nodes) == 0:
logger.info("Nothing to monitor, no sensors would be installed")
return
if ctx.sensors_mon_q is None:
logger.info("Start sensors data receiving thread")
ctx.sensors_mon_q = start_sensor_process_thread(ctx, cfg,
sensors_configs,
source2roles_map)
if undeploy:
def remove_sensors_stage(cfg, ctx):
_, sensors_configs, _ = \
get_sensors_config_for_nodes(cfg['sensors'], nodes)
stop_and_remove_sensors(sensors_configs)
ctx.clear_calls_stack.append(remove_sensors_stage)
logger.info("Deploing new sensors on {0} node(s)".format(len(nodes)))
deploy_and_start_sensors(sensors_configs)
wait_for_new_sensors_data(ctx, monitored_nodes)
def wait_for_new_sensors_data(ctx, monitored_nodes):
MAX_WAIT_FOR_SENSORS = 10
etime = time.time() + MAX_WAIT_FOR_SENSORS
msg = "Waiting at most {0}s till all {1} nodes starts report sensors data"
nodes_ids = set(node.get_conn_id() for node in monitored_nodes)
logger.debug(msg.format(MAX_WAIT_FOR_SENSORS, len(nodes_ids)))
# wait till all nodes start sending data
while len(nodes_ids) != 0:
tleft = etime - time.time()
try:
source_id = ctx.sensors_mon_q.get(True, tleft)[2]
except Queue.Empty:
msg = "Node {0} not sending any sensor data in {1}s"
msg = msg.format(", ".join(nodes_ids), MAX_WAIT_FOR_SENSORS)
raise RuntimeError(msg)
if source_id not in nodes_ids:
msg = "Receive sensors from extra node: {0}".format(source_id)
logger.warning(msg)
nodes_ids.remove(source_id)