blob: 81a283270bb62e75242b9374588bce6e051e8890 [file] [log] [blame]
koder aka kdanilov168f6092015-04-19 02:33:38 +03001import time
2import Queue
3import logging
4import threading
5
6from wally import utils
7from wally.config import cfg_dict
8from wally.sensors.api import (start_listener_thread,
9 deploy_and_start_sensors,
10 SensorConfig,
11 stop_and_remove_sensors)
12
koder aka kdanilov57ce4db2015-04-25 21:25:51 +030013logger = logging.getLogger("wally.sensors")
koder aka kdanilova4a570f2015-04-23 22:11:40 +030014DEFAULT_RECEIVER_URL = "udp://{ip}:5699"
koder aka kdanilov168f6092015-04-19 02:33:38 +030015
16
koder aka kdanilov57ce4db2015-04-25 21:25:51 +030017def save_sensors_data(data_q, mon_q, fd, data_store, source2roles_map):
koder aka kdanilov168f6092015-04-19 02:33:38 +030018 fd.write("\n")
19
20 observed_nodes = set()
21
22 try:
23 while True:
24 val = data_q.get()
25 if val is None:
26 break
27
28 addr, data = val
29 if addr not in observed_nodes:
30 mon_q.put(addr + (data['source_id'],))
31 observed_nodes.add(addr)
32
koder aka kdanilov57ce4db2015-04-25 21:25:51 +030033 fd.write(repr((addr, data)) + "\n")
34
koder aka kdanilovf86d7af2015-05-06 04:01:54 +030035 # source_id = data.pop('source_id')
36 # rep_time = data.pop('time')
37 # if 'testnode' in source2roles_map.get(source_id, []):
38 # sum_io_q = 0
39 # data_store.update_values(rep_time,
40 # {"testnodes:io": sum_io_q},
41 # add=True)
koder aka kdanilov168f6092015-04-19 02:33:38 +030042 except Exception:
43 logger.exception("Error in sensors thread")
44 logger.info("Sensors thread exits")
45
46
47def get_sensors_config_for_nodes(cfg, nodes):
48 monitored_nodes = []
49 sensors_configs = []
koder aka kdanilov57ce4db2015-04-25 21:25:51 +030050 source2roles_map = {}
koder aka kdanilov168f6092015-04-19 02:33:38 +030051
koder aka kdanilova4a570f2015-04-23 22:11:40 +030052 receiver_url = cfg.get("receiver_url", DEFAULT_RECEIVER_URL)
koder aka kdanilov168f6092015-04-19 02:33:38 +030053 assert '{ip}' in receiver_url
54
55 for role, sensors_str in cfg["roles_mapping"].items():
56 sensors = [sens.strip() for sens in sensors_str.split(",")]
57
58 collect_cfg = dict((sensor, {}) for sensor in sensors)
59
60 for node in nodes:
61 if role in node.roles:
62
koder aka kdanilovf86d7af2015-05-06 04:01:54 +030063 if node.monitor_ip is not None:
64 monitor_url = receiver_url.format(ip=node.monitor_ip)
koder aka kdanilov168f6092015-04-19 02:33:38 +030065 else:
koder aka kdanilov0c598a12015-04-21 03:01:40 +030066 ip = node.get_ip()
koder aka kdanilov209e85d2015-04-27 23:11:05 +030067 ext_ip = utils.get_ip_for_target(ip)
koder aka kdanilov168f6092015-04-19 02:33:38 +030068 monitor_url = receiver_url.format(ip=ext_ip)
69
koder aka kdanilov57ce4db2015-04-25 21:25:51 +030070 source2roles_map[node.get_conn_id()] = node.roles
koder aka kdanilov168f6092015-04-19 02:33:38 +030071 monitored_nodes.append(node)
72 sens_cfg = SensorConfig(node.connection,
73 node.get_conn_id(),
74 collect_cfg,
75 source_id=node.get_conn_id(),
76 monitor_url=monitor_url)
77 sensors_configs.append(sens_cfg)
78
koder aka kdanilov57ce4db2015-04-25 21:25:51 +030079 return monitored_nodes, sensors_configs, source2roles_map
koder aka kdanilov168f6092015-04-19 02:33:38 +030080
81
koder aka kdanilov57ce4db2015-04-25 21:25:51 +030082def start_sensor_process_thread(ctx, cfg, sensors_configs, source2roles_map):
koder aka kdanilova4a570f2015-04-23 22:11:40 +030083 receiver_url = cfg.get('receiver_url', DEFAULT_RECEIVER_URL)
koder aka kdanilov168f6092015-04-19 02:33:38 +030084 sensors_data_q, stop_sensors_loop = \
85 start_listener_thread(receiver_url.format(ip='0.0.0.0'))
86
87 mon_q = Queue.Queue()
88 fd = open(cfg_dict['sensor_storage'], "w")
koder aka kdanilov57ce4db2015-04-25 21:25:51 +030089
90 params = sensors_data_q, mon_q, fd, ctx.sensors_data, source2roles_map
koder aka kdanilov168f6092015-04-19 02:33:38 +030091 sensor_listen_th = threading.Thread(None, save_sensors_data, None,
koder aka kdanilov57ce4db2015-04-25 21:25:51 +030092 params)
koder aka kdanilov168f6092015-04-19 02:33:38 +030093 sensor_listen_th.daemon = True
94 sensor_listen_th.start()
95
96 def stop_sensors_receiver(cfg, ctx):
97 stop_sensors_loop()
98 sensors_data_q.put(None)
99 sensor_listen_th.join()
100
101 ctx.clear_calls_stack.append(stop_sensors_receiver)
102 return mon_q
103
104
koder aka kdanilovf86d7af2015-05-06 04:01:54 +0300105def deploy_sensors_stage(cfg, ctx, nodes=None, undeploy=True,
106 recv_timeout=10, ignore_nodata=False):
koder aka kdanilov168f6092015-04-19 02:33:38 +0300107 if 'sensors' not in cfg:
108 return
109
110 cfg = cfg.get('sensors')
111
112 if nodes is None:
113 nodes = ctx.nodes
114
koder aka kdanilov57ce4db2015-04-25 21:25:51 +0300115 monitored_nodes, sensors_configs, source2roles_map = \
116 get_sensors_config_for_nodes(cfg, nodes)
koder aka kdanilov168f6092015-04-19 02:33:38 +0300117
118 if len(monitored_nodes) == 0:
119 logger.info("Nothing to monitor, no sensors would be installed")
120 return
121
122 if ctx.sensors_mon_q is None:
koder aka kdanilove87ae652015-04-20 02:14:35 +0300123 logger.info("Start sensors data receiving thread")
koder aka kdanilov168f6092015-04-19 02:33:38 +0300124 ctx.sensors_mon_q = start_sensor_process_thread(ctx, cfg,
koder aka kdanilov57ce4db2015-04-25 21:25:51 +0300125 sensors_configs,
126 source2roles_map)
koder aka kdanilov168f6092015-04-19 02:33:38 +0300127
128 if undeploy:
129 def remove_sensors_stage(cfg, ctx):
koder aka kdanilov57ce4db2015-04-25 21:25:51 +0300130 _, sensors_configs, _ = \
131 get_sensors_config_for_nodes(cfg['sensors'], nodes)
koder aka kdanilov168f6092015-04-19 02:33:38 +0300132 stop_and_remove_sensors(sensors_configs)
koder aka kdanilovabd6ead2015-04-24 02:03:07 +0300133
koder aka kdanilov168f6092015-04-19 02:33:38 +0300134 ctx.clear_calls_stack.append(remove_sensors_stage)
135
koder aka kdanilovf86d7af2015-05-06 04:01:54 +0300136 num_monitoref_nodes = len(sensors_configs)
137 logger.info("Deploing new sensors on {0} node(s)".format(
138 num_monitoref_nodes))
139
koder aka kdanilov168f6092015-04-19 02:33:38 +0300140 deploy_and_start_sensors(sensors_configs)
koder aka kdanilovf86d7af2015-05-06 04:01:54 +0300141 wait_for_new_sensors_data(ctx, monitored_nodes, recv_timeout,
142 ignore_nodata)
koder aka kdanilov168f6092015-04-19 02:33:38 +0300143
144
koder aka kdanilovf86d7af2015-05-06 04:01:54 +0300145def wait_for_new_sensors_data(ctx, monitored_nodes, recv_timeout,
146 ignore_nodata):
147 etime = time.time() + recv_timeout
koder aka kdanilov168f6092015-04-19 02:33:38 +0300148
149 msg = "Waiting at most {0}s till all {1} nodes starts report sensors data"
150 nodes_ids = set(node.get_conn_id() for node in monitored_nodes)
koder aka kdanilovf86d7af2015-05-06 04:01:54 +0300151 logger.debug(msg.format(recv_timeout, len(nodes_ids)))
koder aka kdanilov168f6092015-04-19 02:33:38 +0300152
153 # wait till all nodes start sending data
154 while len(nodes_ids) != 0:
155 tleft = etime - time.time()
156 try:
157 source_id = ctx.sensors_mon_q.get(True, tleft)[2]
158 except Queue.Empty:
koder aka kdanilovf86d7af2015-05-06 04:01:54 +0300159 if not ignore_nodata:
160 msg = "Node(s) {0} not sending any sensor data in {1}s"
161 msg = msg.format(", ".join(nodes_ids), recv_timeout)
162 raise RuntimeError(msg)
163 else:
164 return
koder aka kdanilov168f6092015-04-19 02:33:38 +0300165
166 if source_id not in nodes_ids:
167 msg = "Receive sensors from extra node: {0}".format(source_id)
168 logger.warning(msg)
169
170 nodes_ids.remove(source_id)