koder aka kdanilov | d5ed4da | 2015-05-07 23:33:23 +0300 | [diff] [blame^] | 1 | import csv |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 2 | import time |
| 3 | import Queue |
| 4 | import logging |
| 5 | import threading |
| 6 | |
| 7 | from wally import utils |
| 8 | from wally.config import cfg_dict |
| 9 | from wally.sensors.api import (start_listener_thread, |
| 10 | deploy_and_start_sensors, |
| 11 | SensorConfig, |
| 12 | stop_and_remove_sensors) |
| 13 | |
koder aka kdanilov | 57ce4db | 2015-04-25 21:25:51 +0300 | [diff] [blame] | 14 | logger = logging.getLogger("wally.sensors") |
koder aka kdanilov | a4a570f | 2015-04-23 22:11:40 +0300 | [diff] [blame] | 15 | DEFAULT_RECEIVER_URL = "udp://{ip}:5699" |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 16 | |
| 17 | |
koder aka kdanilov | 57ce4db | 2015-04-25 21:25:51 +0300 | [diff] [blame] | 18 | def save_sensors_data(data_q, mon_q, fd, data_store, source2roles_map): |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 19 | fd.write("\n") |
| 20 | |
| 21 | observed_nodes = set() |
koder aka kdanilov | d5ed4da | 2015-05-07 23:33:23 +0300 | [diff] [blame^] | 22 | fields_list_for_nodes = {} |
| 23 | required_keys = set(['time', 'source_id', 'hostname']) |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 24 | |
| 25 | try: |
koder aka kdanilov | d5ed4da | 2015-05-07 23:33:23 +0300 | [diff] [blame^] | 26 | csv_fd = csv.writer(fd) |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 27 | while True: |
| 28 | val = data_q.get() |
| 29 | if val is None: |
| 30 | break |
| 31 | |
| 32 | addr, data = val |
| 33 | if addr not in observed_nodes: |
| 34 | mon_q.put(addr + (data['source_id'],)) |
| 35 | observed_nodes.add(addr) |
koder aka kdanilov | d5ed4da | 2015-05-07 23:33:23 +0300 | [diff] [blame^] | 36 | keys = set(data) |
| 37 | assert required_keys.issubset(keys) |
| 38 | keys -= required_keys |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 39 | |
koder aka kdanilov | d5ed4da | 2015-05-07 23:33:23 +0300 | [diff] [blame^] | 40 | fields_list_for_nodes[addr] = sorted(keys) |
| 41 | csv_fd.writerow([addr[0], addr[1], |
| 42 | data['source_id'], data['hostname']] + |
| 43 | fields_list_for_nodes[addr]) |
koder aka kdanilov | 57ce4db | 2015-04-25 21:25:51 +0300 | [diff] [blame] | 44 | |
koder aka kdanilov | d5ed4da | 2015-05-07 23:33:23 +0300 | [diff] [blame^] | 45 | csv_fd.writerow([addr[0], addr[1]] + |
| 46 | map(data.__getitem__, |
| 47 | ['time'] + fields_list_for_nodes[addr])) |
| 48 | |
| 49 | # fd.write(repr((addr, data)) + "\n") |
koder aka kdanilov | f86d7af | 2015-05-06 04:01:54 +0300 | [diff] [blame] | 50 | # source_id = data.pop('source_id') |
| 51 | # rep_time = data.pop('time') |
| 52 | # if 'testnode' in source2roles_map.get(source_id, []): |
| 53 | # sum_io_q = 0 |
| 54 | # data_store.update_values(rep_time, |
| 55 | # {"testnodes:io": sum_io_q}, |
| 56 | # add=True) |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 57 | except Exception: |
| 58 | logger.exception("Error in sensors thread") |
| 59 | logger.info("Sensors thread exits") |
| 60 | |
| 61 | |
| 62 | def get_sensors_config_for_nodes(cfg, nodes): |
| 63 | monitored_nodes = [] |
| 64 | sensors_configs = [] |
koder aka kdanilov | 57ce4db | 2015-04-25 21:25:51 +0300 | [diff] [blame] | 65 | source2roles_map = {} |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 66 | |
koder aka kdanilov | a4a570f | 2015-04-23 22:11:40 +0300 | [diff] [blame] | 67 | receiver_url = cfg.get("receiver_url", DEFAULT_RECEIVER_URL) |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 68 | assert '{ip}' in receiver_url |
| 69 | |
| 70 | for role, sensors_str in cfg["roles_mapping"].items(): |
| 71 | sensors = [sens.strip() for sens in sensors_str.split(",")] |
| 72 | |
| 73 | collect_cfg = dict((sensor, {}) for sensor in sensors) |
| 74 | |
| 75 | for node in nodes: |
| 76 | if role in node.roles: |
| 77 | |
koder aka kdanilov | f86d7af | 2015-05-06 04:01:54 +0300 | [diff] [blame] | 78 | if node.monitor_ip is not None: |
| 79 | monitor_url = receiver_url.format(ip=node.monitor_ip) |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 80 | else: |
koder aka kdanilov | 0c598a1 | 2015-04-21 03:01:40 +0300 | [diff] [blame] | 81 | ip = node.get_ip() |
koder aka kdanilov | 209e85d | 2015-04-27 23:11:05 +0300 | [diff] [blame] | 82 | ext_ip = utils.get_ip_for_target(ip) |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 83 | monitor_url = receiver_url.format(ip=ext_ip) |
| 84 | |
koder aka kdanilov | 57ce4db | 2015-04-25 21:25:51 +0300 | [diff] [blame] | 85 | source2roles_map[node.get_conn_id()] = node.roles |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 86 | monitored_nodes.append(node) |
| 87 | sens_cfg = SensorConfig(node.connection, |
| 88 | node.get_conn_id(), |
| 89 | collect_cfg, |
| 90 | source_id=node.get_conn_id(), |
| 91 | monitor_url=monitor_url) |
| 92 | sensors_configs.append(sens_cfg) |
| 93 | |
koder aka kdanilov | 57ce4db | 2015-04-25 21:25:51 +0300 | [diff] [blame] | 94 | return monitored_nodes, sensors_configs, source2roles_map |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 95 | |
| 96 | |
koder aka kdanilov | 57ce4db | 2015-04-25 21:25:51 +0300 | [diff] [blame] | 97 | def start_sensor_process_thread(ctx, cfg, sensors_configs, source2roles_map): |
koder aka kdanilov | a4a570f | 2015-04-23 22:11:40 +0300 | [diff] [blame] | 98 | receiver_url = cfg.get('receiver_url', DEFAULT_RECEIVER_URL) |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 99 | sensors_data_q, stop_sensors_loop = \ |
| 100 | start_listener_thread(receiver_url.format(ip='0.0.0.0')) |
| 101 | |
| 102 | mon_q = Queue.Queue() |
| 103 | fd = open(cfg_dict['sensor_storage'], "w") |
koder aka kdanilov | 57ce4db | 2015-04-25 21:25:51 +0300 | [diff] [blame] | 104 | |
| 105 | params = sensors_data_q, mon_q, fd, ctx.sensors_data, source2roles_map |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 106 | sensor_listen_th = threading.Thread(None, save_sensors_data, None, |
koder aka kdanilov | 57ce4db | 2015-04-25 21:25:51 +0300 | [diff] [blame] | 107 | params) |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 108 | sensor_listen_th.daemon = True |
| 109 | sensor_listen_th.start() |
| 110 | |
| 111 | def stop_sensors_receiver(cfg, ctx): |
| 112 | stop_sensors_loop() |
| 113 | sensors_data_q.put(None) |
| 114 | sensor_listen_th.join() |
| 115 | |
| 116 | ctx.clear_calls_stack.append(stop_sensors_receiver) |
| 117 | return mon_q |
| 118 | |
| 119 | |
koder aka kdanilov | f86d7af | 2015-05-06 04:01:54 +0300 | [diff] [blame] | 120 | def deploy_sensors_stage(cfg, ctx, nodes=None, undeploy=True, |
| 121 | recv_timeout=10, ignore_nodata=False): |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 122 | if 'sensors' not in cfg: |
| 123 | return |
| 124 | |
| 125 | cfg = cfg.get('sensors') |
| 126 | |
| 127 | if nodes is None: |
| 128 | nodes = ctx.nodes |
| 129 | |
koder aka kdanilov | 57ce4db | 2015-04-25 21:25:51 +0300 | [diff] [blame] | 130 | monitored_nodes, sensors_configs, source2roles_map = \ |
| 131 | get_sensors_config_for_nodes(cfg, nodes) |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 132 | |
| 133 | if len(monitored_nodes) == 0: |
| 134 | logger.info("Nothing to monitor, no sensors would be installed") |
| 135 | return |
| 136 | |
| 137 | if ctx.sensors_mon_q is None: |
koder aka kdanilov | e87ae65 | 2015-04-20 02:14:35 +0300 | [diff] [blame] | 138 | logger.info("Start sensors data receiving thread") |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 139 | ctx.sensors_mon_q = start_sensor_process_thread(ctx, cfg, |
koder aka kdanilov | 57ce4db | 2015-04-25 21:25:51 +0300 | [diff] [blame] | 140 | sensors_configs, |
| 141 | source2roles_map) |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 142 | |
| 143 | if undeploy: |
| 144 | def remove_sensors_stage(cfg, ctx): |
koder aka kdanilov | 57ce4db | 2015-04-25 21:25:51 +0300 | [diff] [blame] | 145 | _, sensors_configs, _ = \ |
| 146 | get_sensors_config_for_nodes(cfg['sensors'], nodes) |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 147 | stop_and_remove_sensors(sensors_configs) |
koder aka kdanilov | abd6ead | 2015-04-24 02:03:07 +0300 | [diff] [blame] | 148 | |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 149 | ctx.clear_calls_stack.append(remove_sensors_stage) |
| 150 | |
koder aka kdanilov | f86d7af | 2015-05-06 04:01:54 +0300 | [diff] [blame] | 151 | num_monitoref_nodes = len(sensors_configs) |
| 152 | logger.info("Deploing new sensors on {0} node(s)".format( |
| 153 | num_monitoref_nodes)) |
| 154 | |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 155 | deploy_and_start_sensors(sensors_configs) |
koder aka kdanilov | f86d7af | 2015-05-06 04:01:54 +0300 | [diff] [blame] | 156 | wait_for_new_sensors_data(ctx, monitored_nodes, recv_timeout, |
| 157 | ignore_nodata) |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 158 | |
| 159 | |
koder aka kdanilov | f86d7af | 2015-05-06 04:01:54 +0300 | [diff] [blame] | 160 | def wait_for_new_sensors_data(ctx, monitored_nodes, recv_timeout, |
| 161 | ignore_nodata): |
| 162 | etime = time.time() + recv_timeout |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 163 | |
| 164 | msg = "Waiting at most {0}s till all {1} nodes starts report sensors data" |
| 165 | nodes_ids = set(node.get_conn_id() for node in monitored_nodes) |
koder aka kdanilov | f86d7af | 2015-05-06 04:01:54 +0300 | [diff] [blame] | 166 | logger.debug(msg.format(recv_timeout, len(nodes_ids))) |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 167 | |
| 168 | # wait till all nodes start sending data |
| 169 | while len(nodes_ids) != 0: |
| 170 | tleft = etime - time.time() |
| 171 | try: |
| 172 | source_id = ctx.sensors_mon_q.get(True, tleft)[2] |
| 173 | except Queue.Empty: |
koder aka kdanilov | f86d7af | 2015-05-06 04:01:54 +0300 | [diff] [blame] | 174 | if not ignore_nodata: |
| 175 | msg = "Node(s) {0} not sending any sensor data in {1}s" |
| 176 | msg = msg.format(", ".join(nodes_ids), recv_timeout) |
| 177 | raise RuntimeError(msg) |
| 178 | else: |
| 179 | return |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 180 | |
| 181 | if source_id not in nodes_ids: |
| 182 | msg = "Receive sensors from extra node: {0}".format(source_id) |
| 183 | logger.warning(msg) |
| 184 | |
| 185 | nodes_ids.remove(source_id) |