blob: 1bd21642514d909866286336cdc44d7dd64c8e44 [file] [log] [blame]
koder aka kdanilov168f6092015-04-19 02:33:38 +03001import time
2import Queue
3import logging
4import threading
5
6from wally import utils
7from wally.config import cfg_dict
8from wally.sensors.api import (start_listener_thread,
9 deploy_and_start_sensors,
10 SensorConfig,
11 stop_and_remove_sensors)
12
13
14logger = logging.getLogger("wally")
koder aka kdanilova4a570f2015-04-23 22:11:40 +030015DEFAULT_RECEIVER_URL = "udp://{ip}:5699"
koder aka kdanilov168f6092015-04-19 02:33:38 +030016
17
Ved-vampir34d52012015-04-23 14:44:39 +030018def save_sensors_data(data_q, mon_q, fd):
koder aka kdanilov168f6092015-04-19 02:33:38 +030019 fd.write("\n")
20
21 observed_nodes = set()
22
23 try:
24 while True:
25 val = data_q.get()
26 if val is None:
27 break
28
29 addr, data = val
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +030030
koder aka kdanilov168f6092015-04-19 02:33:38 +030031 if addr not in observed_nodes:
32 mon_q.put(addr + (data['source_id'],))
33 observed_nodes.add(addr)
34
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +030035 fd.write("{0!s} : {1!r}\n".format(time.time(),
36 repr((addr, data))))
koder aka kdanilov168f6092015-04-19 02:33:38 +030037 except Exception:
38 logger.exception("Error in sensors thread")
39 logger.info("Sensors thread exits")
40
41
42def get_sensors_config_for_nodes(cfg, nodes):
43 monitored_nodes = []
44 sensors_configs = []
45
koder aka kdanilova4a570f2015-04-23 22:11:40 +030046 receiver_url = cfg.get("receiver_url", DEFAULT_RECEIVER_URL)
koder aka kdanilov168f6092015-04-19 02:33:38 +030047 assert '{ip}' in receiver_url
48
49 for role, sensors_str in cfg["roles_mapping"].items():
50 sensors = [sens.strip() for sens in sensors_str.split(",")]
51
52 collect_cfg = dict((sensor, {}) for sensor in sensors)
53
54 for node in nodes:
55 if role in node.roles:
56
57 if node.monitor_url is not None:
58 monitor_url = node.monitor_url
59 else:
koder aka kdanilov0c598a12015-04-21 03:01:40 +030060 ip = node.get_ip()
61 if ip == '127.0.0.1':
62 ext_ip = '127.0.0.1'
63 else:
64 ext_ip = utils.get_ip_for_target(ip)
koder aka kdanilov168f6092015-04-19 02:33:38 +030065 monitor_url = receiver_url.format(ip=ext_ip)
66
67 monitored_nodes.append(node)
68 sens_cfg = SensorConfig(node.connection,
69 node.get_conn_id(),
70 collect_cfg,
71 source_id=node.get_conn_id(),
72 monitor_url=monitor_url)
73 sensors_configs.append(sens_cfg)
74
75 return monitored_nodes, sensors_configs
76
77
78def start_sensor_process_thread(ctx, cfg, sensors_configs):
koder aka kdanilova4a570f2015-04-23 22:11:40 +030079 receiver_url = cfg.get('receiver_url', DEFAULT_RECEIVER_URL)
koder aka kdanilov168f6092015-04-19 02:33:38 +030080 sensors_data_q, stop_sensors_loop = \
81 start_listener_thread(receiver_url.format(ip='0.0.0.0'))
82
83 mon_q = Queue.Queue()
84 fd = open(cfg_dict['sensor_storage'], "w")
koder aka kdanilov168f6092015-04-19 02:33:38 +030085 sensor_listen_th = threading.Thread(None, save_sensors_data, None,
Ved-vampir34d52012015-04-23 14:44:39 +030086 (sensors_data_q, mon_q, fd))
koder aka kdanilov168f6092015-04-19 02:33:38 +030087 sensor_listen_th.daemon = True
88 sensor_listen_th.start()
89
90 def stop_sensors_receiver(cfg, ctx):
91 stop_sensors_loop()
92 sensors_data_q.put(None)
93 sensor_listen_th.join()
94
95 ctx.clear_calls_stack.append(stop_sensors_receiver)
96 return mon_q
97
98
99def deploy_sensors_stage(cfg, ctx, nodes=None, undeploy=True):
100 if 'sensors' not in cfg:
101 return
102
103 cfg = cfg.get('sensors')
104
105 if nodes is None:
106 nodes = ctx.nodes
107
koder aka kdanilov168f6092015-04-19 02:33:38 +0300108 monitored_nodes, sensors_configs = get_sensors_config_for_nodes(cfg,
109 nodes)
110
111 if len(monitored_nodes) == 0:
112 logger.info("Nothing to monitor, no sensors would be installed")
113 return
114
115 if ctx.sensors_mon_q is None:
koder aka kdanilove87ae652015-04-20 02:14:35 +0300116 logger.info("Start sensors data receiving thread")
koder aka kdanilov168f6092015-04-19 02:33:38 +0300117 ctx.sensors_mon_q = start_sensor_process_thread(ctx, cfg,
118 sensors_configs)
119
120 if undeploy:
121 def remove_sensors_stage(cfg, ctx):
122 stop_and_remove_sensors(sensors_configs)
123 ctx.clear_calls_stack.append(remove_sensors_stage)
124
koder aka kdanilove87ae652015-04-20 02:14:35 +0300125 logger.info("Deploing new sensors on {0} node(s)".format(len(nodes)))
koder aka kdanilov168f6092015-04-19 02:33:38 +0300126 deploy_and_start_sensors(sensors_configs)
127 wait_for_new_sensors_data(ctx, monitored_nodes)
128
129
130def wait_for_new_sensors_data(ctx, monitored_nodes):
131 MAX_WAIT_FOR_SENSORS = 10
132 etime = time.time() + MAX_WAIT_FOR_SENSORS
133
134 msg = "Waiting at most {0}s till all {1} nodes starts report sensors data"
135 nodes_ids = set(node.get_conn_id() for node in monitored_nodes)
136 logger.debug(msg.format(MAX_WAIT_FOR_SENSORS, len(nodes_ids)))
137
138 # wait till all nodes start sending data
139 while len(nodes_ids) != 0:
140 tleft = etime - time.time()
141 try:
142 source_id = ctx.sensors_mon_q.get(True, tleft)[2]
143 except Queue.Empty:
144 msg = "Node {0} not sending any sensor data in {1}s"
145 msg = msg.format(", ".join(nodes_ids), MAX_WAIT_FOR_SENSORS)
146 raise RuntimeError(msg)
147
148 if source_id not in nodes_ids:
149 msg = "Receive sensors from extra node: {0}".format(source_id)
150 logger.warning(msg)
151
152 nodes_ids.remove(source_id)