koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 1 | import time |
koder aka kdanilov | 57ce4db | 2015-04-25 21:25:51 +0300 | [diff] [blame^] | 2 | import array |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 3 | import Queue |
| 4 | import logging |
| 5 | import threading |
| 6 | |
| 7 | from wally import utils |
| 8 | from wally.config import cfg_dict |
| 9 | from wally.sensors.api import (start_listener_thread, |
| 10 | deploy_and_start_sensors, |
| 11 | SensorConfig, |
| 12 | stop_and_remove_sensors) |
| 13 | |
| 14 | |
koder aka kdanilov | 57ce4db | 2015-04-25 21:25:51 +0300 | [diff] [blame^] | 15 | logger = logging.getLogger("wally.sensors") |
koder aka kdanilov | a4a570f | 2015-04-23 22:11:40 +0300 | [diff] [blame] | 16 | DEFAULT_RECEIVER_URL = "udp://{ip}:5699" |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 17 | |
| 18 | |
koder aka kdanilov | 57ce4db | 2015-04-25 21:25:51 +0300 | [diff] [blame^] | 19 | class SensorDatastore(object): |
| 20 | def __init__(self, stime=None): |
| 21 | self.lock = threading.Lock() |
| 22 | self.stime = stime |
| 23 | |
| 24 | self.min_size = 60 * 60 |
| 25 | self.max_size = 60 * 61 |
| 26 | |
| 27 | self.data = { |
| 28 | 'testnodes:io': array.array("B"), |
| 29 | 'testnodes:cpu': array.array("B"), |
| 30 | } |
| 31 | |
| 32 | def get_values(self, name, start, end): |
| 33 | assert end >= start |
| 34 | if end == start: |
| 35 | return [] |
| 36 | |
| 37 | with self.lock: |
| 38 | curr_arr = self.data[name] |
| 39 | if self.stime is None: |
| 40 | return [] |
| 41 | |
| 42 | sidx = start - self.stime |
| 43 | eidx = end - self.stime |
| 44 | |
| 45 | if sidx < 0 and eidx < 0: |
| 46 | return [0] * (end - start) |
| 47 | elif sidx < 0: |
| 48 | return [0] * (-sidx) + curr_arr[:eidx] |
| 49 | return curr_arr[sidx:eidx] |
| 50 | |
| 51 | def set_values(self, start_time, vals): |
| 52 | with self.lock: |
| 53 | return self.add_values_l(start_time, vals) |
| 54 | |
| 55 | def set_values_l(self, start_time, vals): |
| 56 | max_cut = 0 |
| 57 | for name, values in vals.items(): |
| 58 | curr_arr = self.data.setdefault(name, array.array("H")) |
| 59 | |
| 60 | if self.stime is None: |
| 61 | self.stime = start_time |
| 62 | |
| 63 | curr_end_time = len(curr_arr) + self.stime |
| 64 | |
| 65 | if curr_end_time < start_time: |
| 66 | curr_arr.extend([0] * (start_time - curr_end_time)) |
| 67 | curr_arr.extend(values) |
| 68 | elif curr_end_time > start_time: |
| 69 | logger.warning("Duplicated sensors data") |
| 70 | sindex = len(curr_arr) + (start_time - curr_end_time) |
| 71 | |
| 72 | if sindex < 0: |
| 73 | values = values[-sindex:] |
| 74 | sindex = 0 |
| 75 | logger.warning("Some data with timestamp before" |
| 76 | " beginning of measurememts. Skip them") |
| 77 | |
| 78 | curr_arr[sindex:sindex + len(values)] = values |
| 79 | else: |
| 80 | curr_arr.extend(values) |
| 81 | |
| 82 | if len(curr_arr) > self.max_size: |
| 83 | max_cut = max(len(curr_arr) - self.min_size, max_cut) |
| 84 | |
| 85 | if max_cut > 0: |
| 86 | self.start_time += max_cut |
| 87 | for val in vals.values(): |
| 88 | del val[:max_cut] |
| 89 | |
| 90 | |
| 91 | def save_sensors_data(data_q, mon_q, fd, data_store, source2roles_map): |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 92 | fd.write("\n") |
| 93 | |
koder aka kdanilov | 57ce4db | 2015-04-25 21:25:51 +0300 | [diff] [blame^] | 94 | BUFFER = 3 |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 95 | observed_nodes = set() |
koder aka kdanilov | 57ce4db | 2015-04-25 21:25:51 +0300 | [diff] [blame^] | 96 | testnodes_data = { |
| 97 | 'io': {}, |
| 98 | 'cpu': {}, |
| 99 | } |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 100 | |
| 101 | try: |
| 102 | while True: |
| 103 | val = data_q.get() |
| 104 | if val is None: |
| 105 | break |
| 106 | |
| 107 | addr, data = val |
Alyona Kiseleva | 7f6de4f | 2015-04-21 01:04:20 +0300 | [diff] [blame] | 108 | |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 109 | if addr not in observed_nodes: |
| 110 | mon_q.put(addr + (data['source_id'],)) |
| 111 | observed_nodes.add(addr) |
| 112 | |
koder aka kdanilov | 57ce4db | 2015-04-25 21:25:51 +0300 | [diff] [blame^] | 113 | fd.write(repr((addr, data)) + "\n") |
| 114 | |
| 115 | source_id = data.pop('source_id') |
| 116 | rep_time = data.pop('time') |
| 117 | if 'testnode' in source2roles_map.get(source_id, []): |
| 118 | vl = testnodes_data['io'].get(rep_time, 0) |
| 119 | sum_io_q = vl |
| 120 | testnodes_data['io'][rep_time] = sum_io_q |
| 121 | |
| 122 | etime = time.time() - BUFFER |
| 123 | for name, vals in testnodes_data.items(): |
| 124 | new_vals = {} |
| 125 | for rtime, value in vals.items(): |
| 126 | if rtime < etime: |
| 127 | data_store.set_values("testnodes:io", rtime, [value]) |
| 128 | else: |
| 129 | new_vals[rtime] = value |
| 130 | |
| 131 | vals.clear() |
| 132 | vals.update(new_vals) |
| 133 | |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 134 | except Exception: |
| 135 | logger.exception("Error in sensors thread") |
| 136 | logger.info("Sensors thread exits") |
| 137 | |
| 138 | |
| 139 | def get_sensors_config_for_nodes(cfg, nodes): |
| 140 | monitored_nodes = [] |
| 141 | sensors_configs = [] |
koder aka kdanilov | 57ce4db | 2015-04-25 21:25:51 +0300 | [diff] [blame^] | 142 | source2roles_map = {} |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 143 | |
koder aka kdanilov | a4a570f | 2015-04-23 22:11:40 +0300 | [diff] [blame] | 144 | receiver_url = cfg.get("receiver_url", DEFAULT_RECEIVER_URL) |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 145 | assert '{ip}' in receiver_url |
| 146 | |
| 147 | for role, sensors_str in cfg["roles_mapping"].items(): |
| 148 | sensors = [sens.strip() for sens in sensors_str.split(",")] |
| 149 | |
| 150 | collect_cfg = dict((sensor, {}) for sensor in sensors) |
| 151 | |
| 152 | for node in nodes: |
| 153 | if role in node.roles: |
| 154 | |
| 155 | if node.monitor_url is not None: |
| 156 | monitor_url = node.monitor_url |
| 157 | else: |
koder aka kdanilov | 0c598a1 | 2015-04-21 03:01:40 +0300 | [diff] [blame] | 158 | ip = node.get_ip() |
| 159 | if ip == '127.0.0.1': |
| 160 | ext_ip = '127.0.0.1' |
| 161 | else: |
| 162 | ext_ip = utils.get_ip_for_target(ip) |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 163 | monitor_url = receiver_url.format(ip=ext_ip) |
| 164 | |
koder aka kdanilov | 57ce4db | 2015-04-25 21:25:51 +0300 | [diff] [blame^] | 165 | source2roles_map[node.get_conn_id()] = node.roles |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 166 | monitored_nodes.append(node) |
| 167 | sens_cfg = SensorConfig(node.connection, |
| 168 | node.get_conn_id(), |
| 169 | collect_cfg, |
| 170 | source_id=node.get_conn_id(), |
| 171 | monitor_url=monitor_url) |
| 172 | sensors_configs.append(sens_cfg) |
| 173 | |
koder aka kdanilov | 57ce4db | 2015-04-25 21:25:51 +0300 | [diff] [blame^] | 174 | return monitored_nodes, sensors_configs, source2roles_map |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 175 | |
| 176 | |
koder aka kdanilov | 57ce4db | 2015-04-25 21:25:51 +0300 | [diff] [blame^] | 177 | def start_sensor_process_thread(ctx, cfg, sensors_configs, source2roles_map): |
koder aka kdanilov | a4a570f | 2015-04-23 22:11:40 +0300 | [diff] [blame] | 178 | receiver_url = cfg.get('receiver_url', DEFAULT_RECEIVER_URL) |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 179 | sensors_data_q, stop_sensors_loop = \ |
| 180 | start_listener_thread(receiver_url.format(ip='0.0.0.0')) |
| 181 | |
| 182 | mon_q = Queue.Queue() |
| 183 | fd = open(cfg_dict['sensor_storage'], "w") |
koder aka kdanilov | 57ce4db | 2015-04-25 21:25:51 +0300 | [diff] [blame^] | 184 | |
| 185 | params = sensors_data_q, mon_q, fd, ctx.sensors_data, source2roles_map |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 186 | sensor_listen_th = threading.Thread(None, save_sensors_data, None, |
koder aka kdanilov | 57ce4db | 2015-04-25 21:25:51 +0300 | [diff] [blame^] | 187 | params) |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 188 | sensor_listen_th.daemon = True |
| 189 | sensor_listen_th.start() |
| 190 | |
| 191 | def stop_sensors_receiver(cfg, ctx): |
| 192 | stop_sensors_loop() |
| 193 | sensors_data_q.put(None) |
| 194 | sensor_listen_th.join() |
| 195 | |
| 196 | ctx.clear_calls_stack.append(stop_sensors_receiver) |
| 197 | return mon_q |
| 198 | |
| 199 | |
| 200 | def deploy_sensors_stage(cfg, ctx, nodes=None, undeploy=True): |
| 201 | if 'sensors' not in cfg: |
| 202 | return |
| 203 | |
| 204 | cfg = cfg.get('sensors') |
| 205 | |
| 206 | if nodes is None: |
| 207 | nodes = ctx.nodes |
| 208 | |
koder aka kdanilov | 57ce4db | 2015-04-25 21:25:51 +0300 | [diff] [blame^] | 209 | monitored_nodes, sensors_configs, source2roles_map = \ |
| 210 | get_sensors_config_for_nodes(cfg, nodes) |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 211 | |
| 212 | if len(monitored_nodes) == 0: |
| 213 | logger.info("Nothing to monitor, no sensors would be installed") |
| 214 | return |
| 215 | |
| 216 | if ctx.sensors_mon_q is None: |
koder aka kdanilov | e87ae65 | 2015-04-20 02:14:35 +0300 | [diff] [blame] | 217 | logger.info("Start sensors data receiving thread") |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 218 | ctx.sensors_mon_q = start_sensor_process_thread(ctx, cfg, |
koder aka kdanilov | 57ce4db | 2015-04-25 21:25:51 +0300 | [diff] [blame^] | 219 | sensors_configs, |
| 220 | source2roles_map) |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 221 | |
| 222 | if undeploy: |
| 223 | def remove_sensors_stage(cfg, ctx): |
koder aka kdanilov | 57ce4db | 2015-04-25 21:25:51 +0300 | [diff] [blame^] | 224 | _, sensors_configs, _ = \ |
| 225 | get_sensors_config_for_nodes(cfg['sensors'], nodes) |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 226 | stop_and_remove_sensors(sensors_configs) |
koder aka kdanilov | abd6ead | 2015-04-24 02:03:07 +0300 | [diff] [blame] | 227 | |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 228 | ctx.clear_calls_stack.append(remove_sensors_stage) |
| 229 | |
koder aka kdanilov | e87ae65 | 2015-04-20 02:14:35 +0300 | [diff] [blame] | 230 | logger.info("Deploing new sensors on {0} node(s)".format(len(nodes))) |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 231 | deploy_and_start_sensors(sensors_configs) |
| 232 | wait_for_new_sensors_data(ctx, monitored_nodes) |
| 233 | |
| 234 | |
| 235 | def wait_for_new_sensors_data(ctx, monitored_nodes): |
| 236 | MAX_WAIT_FOR_SENSORS = 10 |
| 237 | etime = time.time() + MAX_WAIT_FOR_SENSORS |
| 238 | |
| 239 | msg = "Waiting at most {0}s till all {1} nodes starts report sensors data" |
| 240 | nodes_ids = set(node.get_conn_id() for node in monitored_nodes) |
| 241 | logger.debug(msg.format(MAX_WAIT_FOR_SENSORS, len(nodes_ids))) |
| 242 | |
| 243 | # wait till all nodes start sending data |
| 244 | while len(nodes_ids) != 0: |
| 245 | tleft = etime - time.time() |
| 246 | try: |
| 247 | source_id = ctx.sensors_mon_q.get(True, tleft)[2] |
| 248 | except Queue.Empty: |
| 249 | msg = "Node {0} not sending any sensor data in {1}s" |
| 250 | msg = msg.format(", ".join(nodes_ids), MAX_WAIT_FOR_SENSORS) |
| 251 | raise RuntimeError(msg) |
| 252 | |
| 253 | if source_id not in nodes_ids: |
| 254 | msg = "Receive sensors from extra node: {0}".format(source_id) |
| 255 | logger.warning(msg) |
| 256 | |
| 257 | nodes_ids.remove(source_id) |