blob: 65de0ef7ca14e0ad802e35cf9a646d6ad476022c [file] [log] [blame]
koder aka kdanilovd5ed4da2015-05-07 23:33:23 +03001import csv
koder aka kdanilov168f6092015-04-19 02:33:38 +03002import time
3import Queue
4import logging
5import threading
6
7from wally import utils
8from wally.config import cfg_dict
9from wally.sensors.api import (start_listener_thread,
10 deploy_and_start_sensors,
11 SensorConfig,
12 stop_and_remove_sensors)
13
koder aka kdanilov57ce4db2015-04-25 21:25:51 +030014logger = logging.getLogger("wally.sensors")
koder aka kdanilova4a570f2015-04-23 22:11:40 +030015DEFAULT_RECEIVER_URL = "udp://{ip}:5699"
koder aka kdanilov168f6092015-04-19 02:33:38 +030016
17
koder aka kdanilov57ce4db2015-04-25 21:25:51 +030018def save_sensors_data(data_q, mon_q, fd, data_store, source2roles_map):
koder aka kdanilov168f6092015-04-19 02:33:38 +030019 fd.write("\n")
20
21 observed_nodes = set()
koder aka kdanilovd5ed4da2015-05-07 23:33:23 +030022 fields_list_for_nodes = {}
23 required_keys = set(['time', 'source_id', 'hostname'])
koder aka kdanilov168f6092015-04-19 02:33:38 +030024
25 try:
koder aka kdanilovd5ed4da2015-05-07 23:33:23 +030026 csv_fd = csv.writer(fd)
koder aka kdanilov168f6092015-04-19 02:33:38 +030027 while True:
28 val = data_q.get()
29 if val is None:
30 break
31
32 addr, data = val
33 if addr not in observed_nodes:
34 mon_q.put(addr + (data['source_id'],))
35 observed_nodes.add(addr)
koder aka kdanilovd5ed4da2015-05-07 23:33:23 +030036 keys = set(data)
37 assert required_keys.issubset(keys)
38 keys -= required_keys
koder aka kdanilov168f6092015-04-19 02:33:38 +030039
koder aka kdanilovd5ed4da2015-05-07 23:33:23 +030040 fields_list_for_nodes[addr] = sorted(keys)
41 csv_fd.writerow([addr[0], addr[1],
42 data['source_id'], data['hostname']] +
43 fields_list_for_nodes[addr])
koder aka kdanilov57ce4db2015-04-25 21:25:51 +030044
koder aka kdanilovd5ed4da2015-05-07 23:33:23 +030045 csv_fd.writerow([addr[0], addr[1]] +
46 map(data.__getitem__,
47 ['time'] + fields_list_for_nodes[addr]))
48
49 # fd.write(repr((addr, data)) + "\n")
koder aka kdanilovf86d7af2015-05-06 04:01:54 +030050 # source_id = data.pop('source_id')
51 # rep_time = data.pop('time')
52 # if 'testnode' in source2roles_map.get(source_id, []):
53 # sum_io_q = 0
54 # data_store.update_values(rep_time,
55 # {"testnodes:io": sum_io_q},
56 # add=True)
koder aka kdanilov168f6092015-04-19 02:33:38 +030057 except Exception:
58 logger.exception("Error in sensors thread")
59 logger.info("Sensors thread exits")
60
61
62def get_sensors_config_for_nodes(cfg, nodes):
63 monitored_nodes = []
64 sensors_configs = []
koder aka kdanilov57ce4db2015-04-25 21:25:51 +030065 source2roles_map = {}
koder aka kdanilov168f6092015-04-19 02:33:38 +030066
koder aka kdanilova4a570f2015-04-23 22:11:40 +030067 receiver_url = cfg.get("receiver_url", DEFAULT_RECEIVER_URL)
koder aka kdanilov168f6092015-04-19 02:33:38 +030068 assert '{ip}' in receiver_url
69
70 for role, sensors_str in cfg["roles_mapping"].items():
71 sensors = [sens.strip() for sens in sensors_str.split(",")]
72
73 collect_cfg = dict((sensor, {}) for sensor in sensors)
74
75 for node in nodes:
76 if role in node.roles:
77
koder aka kdanilovf86d7af2015-05-06 04:01:54 +030078 if node.monitor_ip is not None:
79 monitor_url = receiver_url.format(ip=node.monitor_ip)
koder aka kdanilov168f6092015-04-19 02:33:38 +030080 else:
koder aka kdanilov0c598a12015-04-21 03:01:40 +030081 ip = node.get_ip()
koder aka kdanilov209e85d2015-04-27 23:11:05 +030082 ext_ip = utils.get_ip_for_target(ip)
koder aka kdanilov168f6092015-04-19 02:33:38 +030083 monitor_url = receiver_url.format(ip=ext_ip)
84
koder aka kdanilov57ce4db2015-04-25 21:25:51 +030085 source2roles_map[node.get_conn_id()] = node.roles
koder aka kdanilov168f6092015-04-19 02:33:38 +030086 monitored_nodes.append(node)
87 sens_cfg = SensorConfig(node.connection,
88 node.get_conn_id(),
89 collect_cfg,
90 source_id=node.get_conn_id(),
91 monitor_url=monitor_url)
92 sensors_configs.append(sens_cfg)
93
koder aka kdanilov57ce4db2015-04-25 21:25:51 +030094 return monitored_nodes, sensors_configs, source2roles_map
koder aka kdanilov168f6092015-04-19 02:33:38 +030095
96
koder aka kdanilov57ce4db2015-04-25 21:25:51 +030097def start_sensor_process_thread(ctx, cfg, sensors_configs, source2roles_map):
koder aka kdanilova4a570f2015-04-23 22:11:40 +030098 receiver_url = cfg.get('receiver_url', DEFAULT_RECEIVER_URL)
koder aka kdanilov168f6092015-04-19 02:33:38 +030099 sensors_data_q, stop_sensors_loop = \
100 start_listener_thread(receiver_url.format(ip='0.0.0.0'))
101
102 mon_q = Queue.Queue()
103 fd = open(cfg_dict['sensor_storage'], "w")
koder aka kdanilov57ce4db2015-04-25 21:25:51 +0300104
105 params = sensors_data_q, mon_q, fd, ctx.sensors_data, source2roles_map
koder aka kdanilov168f6092015-04-19 02:33:38 +0300106 sensor_listen_th = threading.Thread(None, save_sensors_data, None,
koder aka kdanilov57ce4db2015-04-25 21:25:51 +0300107 params)
koder aka kdanilov168f6092015-04-19 02:33:38 +0300108 sensor_listen_th.daemon = True
109 sensor_listen_th.start()
110
111 def stop_sensors_receiver(cfg, ctx):
112 stop_sensors_loop()
113 sensors_data_q.put(None)
114 sensor_listen_th.join()
115
116 ctx.clear_calls_stack.append(stop_sensors_receiver)
117 return mon_q
118
119
koder aka kdanilovf86d7af2015-05-06 04:01:54 +0300120def deploy_sensors_stage(cfg, ctx, nodes=None, undeploy=True,
121 recv_timeout=10, ignore_nodata=False):
koder aka kdanilov168f6092015-04-19 02:33:38 +0300122
123 cfg = cfg.get('sensors')
koder aka kdanilov416b87a2015-05-12 00:26:04 +0300124 if cfg is None:
125 return
koder aka kdanilov168f6092015-04-19 02:33:38 +0300126
127 if nodes is None:
128 nodes = ctx.nodes
129
koder aka kdanilov57ce4db2015-04-25 21:25:51 +0300130 monitored_nodes, sensors_configs, source2roles_map = \
131 get_sensors_config_for_nodes(cfg, nodes)
koder aka kdanilov168f6092015-04-19 02:33:38 +0300132
133 if len(monitored_nodes) == 0:
134 logger.info("Nothing to monitor, no sensors would be installed")
135 return
136
koder aka kdanilov416b87a2015-05-12 00:26:04 +0300137 is_online = cfg.get('online', False)
138
139 if is_online:
140 if ctx.sensors_mon_q is None:
141 logger.info("Start sensors data receiving thread")
142 ctx.sensors_mon_q = start_sensor_process_thread(ctx, cfg,
143 sensors_configs,
144 source2roles_map)
koder aka kdanilov168f6092015-04-19 02:33:38 +0300145
146 if undeploy:
147 def remove_sensors_stage(cfg, ctx):
koder aka kdanilov57ce4db2015-04-25 21:25:51 +0300148 _, sensors_configs, _ = \
149 get_sensors_config_for_nodes(cfg['sensors'], nodes)
koder aka kdanilov168f6092015-04-19 02:33:38 +0300150 stop_and_remove_sensors(sensors_configs)
koder aka kdanilovabd6ead2015-04-24 02:03:07 +0300151
koder aka kdanilov168f6092015-04-19 02:33:38 +0300152 ctx.clear_calls_stack.append(remove_sensors_stage)
153
koder aka kdanilovf86d7af2015-05-06 04:01:54 +0300154 num_monitoref_nodes = len(sensors_configs)
155 logger.info("Deploing new sensors on {0} node(s)".format(
156 num_monitoref_nodes))
157
koder aka kdanilov168f6092015-04-19 02:33:38 +0300158 deploy_and_start_sensors(sensors_configs)
koder aka kdanilov416b87a2015-05-12 00:26:04 +0300159
160 if is_online:
161 wait_for_new_sensors_data(ctx, monitored_nodes, recv_timeout,
162 ignore_nodata)
163
164
165def gather_sensors_stage(cfg, ctx, nodes=None):
166 cfg = cfg.get('sensors')
167 if cfg is None:
168 return
169
170 is_online = cfg.get('online', False)
171 if is_online:
172 return
173
174 if nodes is None:
175 nodes = ctx.nodes
176
177 _, sensors_configs, _ = get_sensors_config_for_nodes(cfg, nodes)
178 gather_sensors_info(sensors_configs)
179
180
181def gather_sensors_info(sensors_configs):
182 pass
koder aka kdanilov168f6092015-04-19 02:33:38 +0300183
184
koder aka kdanilovf86d7af2015-05-06 04:01:54 +0300185def wait_for_new_sensors_data(ctx, monitored_nodes, recv_timeout,
186 ignore_nodata):
187 etime = time.time() + recv_timeout
koder aka kdanilov168f6092015-04-19 02:33:38 +0300188
189 msg = "Waiting at most {0}s till all {1} nodes starts report sensors data"
190 nodes_ids = set(node.get_conn_id() for node in monitored_nodes)
koder aka kdanilovf86d7af2015-05-06 04:01:54 +0300191 logger.debug(msg.format(recv_timeout, len(nodes_ids)))
koder aka kdanilov168f6092015-04-19 02:33:38 +0300192
193 # wait till all nodes start sending data
194 while len(nodes_ids) != 0:
195 tleft = etime - time.time()
196 try:
197 source_id = ctx.sensors_mon_q.get(True, tleft)[2]
198 except Queue.Empty:
koder aka kdanilovf86d7af2015-05-06 04:01:54 +0300199 if not ignore_nodata:
200 msg = "Node(s) {0} not sending any sensor data in {1}s"
201 msg = msg.format(", ".join(nodes_ids), recv_timeout)
202 raise RuntimeError(msg)
203 else:
204 return
koder aka kdanilov168f6092015-04-19 02:33:38 +0300205
206 if source_id not in nodes_ids:
207 msg = "Receive sensors from extra node: {0}".format(source_id)
208 logger.warning(msg)
209
210 nodes_ids.remove(source_id)