blob: e5b434711f14faa3a12bd2c7f3aa155b60a9c2b0 [file] [log] [blame]
koder aka kdanilov168f6092015-04-19 02:33:38 +03001import time
koder aka kdanilov57ce4db2015-04-25 21:25:51 +03002import array
koder aka kdanilov168f6092015-04-19 02:33:38 +03003import Queue
4import logging
5import threading
6
7from wally import utils
8from wally.config import cfg_dict
9from wally.sensors.api import (start_listener_thread,
10 deploy_and_start_sensors,
11 SensorConfig,
12 stop_and_remove_sensors)
13
14
koder aka kdanilov57ce4db2015-04-25 21:25:51 +030015logger = logging.getLogger("wally.sensors")
koder aka kdanilova4a570f2015-04-23 22:11:40 +030016DEFAULT_RECEIVER_URL = "udp://{ip}:5699"
koder aka kdanilov168f6092015-04-19 02:33:38 +030017
18
koder aka kdanilov57ce4db2015-04-25 21:25:51 +030019class SensorDatastore(object):
20 def __init__(self, stime=None):
21 self.lock = threading.Lock()
22 self.stime = stime
23
24 self.min_size = 60 * 60
25 self.max_size = 60 * 61
26
27 self.data = {
28 'testnodes:io': array.array("B"),
29 'testnodes:cpu': array.array("B"),
30 }
31
32 def get_values(self, name, start, end):
33 assert end >= start
34 if end == start:
35 return []
36
37 with self.lock:
38 curr_arr = self.data[name]
39 if self.stime is None:
40 return []
41
42 sidx = start - self.stime
43 eidx = end - self.stime
44
45 if sidx < 0 and eidx < 0:
46 return [0] * (end - start)
47 elif sidx < 0:
48 return [0] * (-sidx) + curr_arr[:eidx]
49 return curr_arr[sidx:eidx]
50
51 def set_values(self, start_time, vals):
52 with self.lock:
53 return self.add_values_l(start_time, vals)
54
55 def set_values_l(self, start_time, vals):
56 max_cut = 0
57 for name, values in vals.items():
58 curr_arr = self.data.setdefault(name, array.array("H"))
59
60 if self.stime is None:
61 self.stime = start_time
62
63 curr_end_time = len(curr_arr) + self.stime
64
65 if curr_end_time < start_time:
66 curr_arr.extend([0] * (start_time - curr_end_time))
67 curr_arr.extend(values)
68 elif curr_end_time > start_time:
69 logger.warning("Duplicated sensors data")
70 sindex = len(curr_arr) + (start_time - curr_end_time)
71
72 if sindex < 0:
73 values = values[-sindex:]
74 sindex = 0
75 logger.warning("Some data with timestamp before"
76 " beginning of measurememts. Skip them")
77
78 curr_arr[sindex:sindex + len(values)] = values
79 else:
80 curr_arr.extend(values)
81
82 if len(curr_arr) > self.max_size:
83 max_cut = max(len(curr_arr) - self.min_size, max_cut)
84
85 if max_cut > 0:
86 self.start_time += max_cut
87 for val in vals.values():
88 del val[:max_cut]
89
90
91def save_sensors_data(data_q, mon_q, fd, data_store, source2roles_map):
koder aka kdanilov168f6092015-04-19 02:33:38 +030092 fd.write("\n")
93
koder aka kdanilov57ce4db2015-04-25 21:25:51 +030094 BUFFER = 3
koder aka kdanilov168f6092015-04-19 02:33:38 +030095 observed_nodes = set()
koder aka kdanilov57ce4db2015-04-25 21:25:51 +030096 testnodes_data = {
97 'io': {},
98 'cpu': {},
99 }
koder aka kdanilov168f6092015-04-19 02:33:38 +0300100
101 try:
102 while True:
103 val = data_q.get()
104 if val is None:
105 break
106
107 addr, data = val
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +0300108
koder aka kdanilov168f6092015-04-19 02:33:38 +0300109 if addr not in observed_nodes:
110 mon_q.put(addr + (data['source_id'],))
111 observed_nodes.add(addr)
112
koder aka kdanilov57ce4db2015-04-25 21:25:51 +0300113 fd.write(repr((addr, data)) + "\n")
114
115 source_id = data.pop('source_id')
116 rep_time = data.pop('time')
117 if 'testnode' in source2roles_map.get(source_id, []):
118 vl = testnodes_data['io'].get(rep_time, 0)
119 sum_io_q = vl
120 testnodes_data['io'][rep_time] = sum_io_q
121
122 etime = time.time() - BUFFER
123 for name, vals in testnodes_data.items():
124 new_vals = {}
125 for rtime, value in vals.items():
126 if rtime < etime:
127 data_store.set_values("testnodes:io", rtime, [value])
128 else:
129 new_vals[rtime] = value
130
131 vals.clear()
132 vals.update(new_vals)
133
koder aka kdanilov168f6092015-04-19 02:33:38 +0300134 except Exception:
135 logger.exception("Error in sensors thread")
136 logger.info("Sensors thread exits")
137
138
139def get_sensors_config_for_nodes(cfg, nodes):
140 monitored_nodes = []
141 sensors_configs = []
koder aka kdanilov57ce4db2015-04-25 21:25:51 +0300142 source2roles_map = {}
koder aka kdanilov168f6092015-04-19 02:33:38 +0300143
koder aka kdanilova4a570f2015-04-23 22:11:40 +0300144 receiver_url = cfg.get("receiver_url", DEFAULT_RECEIVER_URL)
koder aka kdanilov168f6092015-04-19 02:33:38 +0300145 assert '{ip}' in receiver_url
146
147 for role, sensors_str in cfg["roles_mapping"].items():
148 sensors = [sens.strip() for sens in sensors_str.split(",")]
149
150 collect_cfg = dict((sensor, {}) for sensor in sensors)
151
152 for node in nodes:
153 if role in node.roles:
154
155 if node.monitor_url is not None:
156 monitor_url = node.monitor_url
157 else:
koder aka kdanilov0c598a12015-04-21 03:01:40 +0300158 ip = node.get_ip()
159 if ip == '127.0.0.1':
160 ext_ip = '127.0.0.1'
161 else:
162 ext_ip = utils.get_ip_for_target(ip)
koder aka kdanilov168f6092015-04-19 02:33:38 +0300163 monitor_url = receiver_url.format(ip=ext_ip)
164
koder aka kdanilov57ce4db2015-04-25 21:25:51 +0300165 source2roles_map[node.get_conn_id()] = node.roles
koder aka kdanilov168f6092015-04-19 02:33:38 +0300166 monitored_nodes.append(node)
167 sens_cfg = SensorConfig(node.connection,
168 node.get_conn_id(),
169 collect_cfg,
170 source_id=node.get_conn_id(),
171 monitor_url=monitor_url)
172 sensors_configs.append(sens_cfg)
173
koder aka kdanilov57ce4db2015-04-25 21:25:51 +0300174 return monitored_nodes, sensors_configs, source2roles_map
koder aka kdanilov168f6092015-04-19 02:33:38 +0300175
176
koder aka kdanilov57ce4db2015-04-25 21:25:51 +0300177def start_sensor_process_thread(ctx, cfg, sensors_configs, source2roles_map):
koder aka kdanilova4a570f2015-04-23 22:11:40 +0300178 receiver_url = cfg.get('receiver_url', DEFAULT_RECEIVER_URL)
koder aka kdanilov168f6092015-04-19 02:33:38 +0300179 sensors_data_q, stop_sensors_loop = \
180 start_listener_thread(receiver_url.format(ip='0.0.0.0'))
181
182 mon_q = Queue.Queue()
183 fd = open(cfg_dict['sensor_storage'], "w")
koder aka kdanilov57ce4db2015-04-25 21:25:51 +0300184
185 params = sensors_data_q, mon_q, fd, ctx.sensors_data, source2roles_map
koder aka kdanilov168f6092015-04-19 02:33:38 +0300186 sensor_listen_th = threading.Thread(None, save_sensors_data, None,
koder aka kdanilov57ce4db2015-04-25 21:25:51 +0300187 params)
koder aka kdanilov168f6092015-04-19 02:33:38 +0300188 sensor_listen_th.daemon = True
189 sensor_listen_th.start()
190
191 def stop_sensors_receiver(cfg, ctx):
192 stop_sensors_loop()
193 sensors_data_q.put(None)
194 sensor_listen_th.join()
195
196 ctx.clear_calls_stack.append(stop_sensors_receiver)
197 return mon_q
198
199
200def deploy_sensors_stage(cfg, ctx, nodes=None, undeploy=True):
201 if 'sensors' not in cfg:
202 return
203
204 cfg = cfg.get('sensors')
205
206 if nodes is None:
207 nodes = ctx.nodes
208
koder aka kdanilov57ce4db2015-04-25 21:25:51 +0300209 monitored_nodes, sensors_configs, source2roles_map = \
210 get_sensors_config_for_nodes(cfg, nodes)
koder aka kdanilov168f6092015-04-19 02:33:38 +0300211
212 if len(monitored_nodes) == 0:
213 logger.info("Nothing to monitor, no sensors would be installed")
214 return
215
216 if ctx.sensors_mon_q is None:
koder aka kdanilove87ae652015-04-20 02:14:35 +0300217 logger.info("Start sensors data receiving thread")
koder aka kdanilov168f6092015-04-19 02:33:38 +0300218 ctx.sensors_mon_q = start_sensor_process_thread(ctx, cfg,
koder aka kdanilov57ce4db2015-04-25 21:25:51 +0300219 sensors_configs,
220 source2roles_map)
koder aka kdanilov168f6092015-04-19 02:33:38 +0300221
222 if undeploy:
223 def remove_sensors_stage(cfg, ctx):
koder aka kdanilov57ce4db2015-04-25 21:25:51 +0300224 _, sensors_configs, _ = \
225 get_sensors_config_for_nodes(cfg['sensors'], nodes)
koder aka kdanilov168f6092015-04-19 02:33:38 +0300226 stop_and_remove_sensors(sensors_configs)
koder aka kdanilovabd6ead2015-04-24 02:03:07 +0300227
koder aka kdanilov168f6092015-04-19 02:33:38 +0300228 ctx.clear_calls_stack.append(remove_sensors_stage)
229
koder aka kdanilove87ae652015-04-20 02:14:35 +0300230 logger.info("Deploing new sensors on {0} node(s)".format(len(nodes)))
koder aka kdanilov168f6092015-04-19 02:33:38 +0300231 deploy_and_start_sensors(sensors_configs)
232 wait_for_new_sensors_data(ctx, monitored_nodes)
233
234
235def wait_for_new_sensors_data(ctx, monitored_nodes):
236 MAX_WAIT_FOR_SENSORS = 10
237 etime = time.time() + MAX_WAIT_FOR_SENSORS
238
239 msg = "Waiting at most {0}s till all {1} nodes starts report sensors data"
240 nodes_ids = set(node.get_conn_id() for node in monitored_nodes)
241 logger.debug(msg.format(MAX_WAIT_FOR_SENSORS, len(nodes_ids)))
242
243 # wait till all nodes start sending data
244 while len(nodes_ids) != 0:
245 tleft = etime - time.time()
246 try:
247 source_id = ctx.sensors_mon_q.get(True, tleft)[2]
248 except Queue.Empty:
249 msg = "Node {0} not sending any sensor data in {1}s"
250 msg = msg.format(", ".join(nodes_ids), MAX_WAIT_FOR_SENSORS)
251 raise RuntimeError(msg)
252
253 if source_id not in nodes_ids:
254 msg = "Receive sensors from extra node: {0}".format(source_id)
255 logger.warning(msg)
256
257 nodes_ids.remove(source_id)