blob: e44704de9390b85d9231dbfe786d62343738c1b6 [file] [log] [blame]
koder aka kdanilov168f6092015-04-19 02:33:38 +03001import time
2import Queue
3import logging
4import threading
5
6from wally import utils
7from wally.config import cfg_dict
8from wally.sensors.api import (start_listener_thread,
9 deploy_and_start_sensors,
10 SensorConfig,
11 stop_and_remove_sensors)
12
13
14logger = logging.getLogger("wally")
15
16
17def save_sensors_data(data_q, mon_q, fd):
18 fd.write("\n")
19
20 observed_nodes = set()
21
22 try:
23 while True:
24 val = data_q.get()
25 if val is None:
26 break
27
28 addr, data = val
29 if addr not in observed_nodes:
30 mon_q.put(addr + (data['source_id'],))
31 observed_nodes.add(addr)
32
33 fd.write("{0!s} : {1!r}\n".format(time.time(), repr(val)))
34 except Exception:
35 logger.exception("Error in sensors thread")
36 logger.info("Sensors thread exits")
37
38
39def get_sensors_config_for_nodes(cfg, nodes):
40 monitored_nodes = []
41 sensors_configs = []
42
43 receiver_url = cfg["receiver_url"]
44 assert '{ip}' in receiver_url
45
46 for role, sensors_str in cfg["roles_mapping"].items():
47 sensors = [sens.strip() for sens in sensors_str.split(",")]
48
49 collect_cfg = dict((sensor, {}) for sensor in sensors)
50
51 for node in nodes:
52 if role in node.roles:
53
54 if node.monitor_url is not None:
55 monitor_url = node.monitor_url
56 else:
koder aka kdanilov0c598a12015-04-21 03:01:40 +030057 ip = node.get_ip()
58 if ip == '127.0.0.1':
59 ext_ip = '127.0.0.1'
60 else:
61 ext_ip = utils.get_ip_for_target(ip)
koder aka kdanilov168f6092015-04-19 02:33:38 +030062 monitor_url = receiver_url.format(ip=ext_ip)
63
64 monitored_nodes.append(node)
65 sens_cfg = SensorConfig(node.connection,
66 node.get_conn_id(),
67 collect_cfg,
68 source_id=node.get_conn_id(),
69 monitor_url=monitor_url)
70 sensors_configs.append(sens_cfg)
71
72 return monitored_nodes, sensors_configs
73
74
75def start_sensor_process_thread(ctx, cfg, sensors_configs):
76 receiver_url = cfg["receiver_url"]
77 sensors_data_q, stop_sensors_loop = \
78 start_listener_thread(receiver_url.format(ip='0.0.0.0'))
79
80 mon_q = Queue.Queue()
81 fd = open(cfg_dict['sensor_storage'], "w")
koder aka kdanilov168f6092015-04-19 02:33:38 +030082 sensor_listen_th = threading.Thread(None, save_sensors_data, None,
83 (sensors_data_q, mon_q, fd))
84 sensor_listen_th.daemon = True
85 sensor_listen_th.start()
86
87 def stop_sensors_receiver(cfg, ctx):
88 stop_sensors_loop()
89 sensors_data_q.put(None)
90 sensor_listen_th.join()
91
92 ctx.clear_calls_stack.append(stop_sensors_receiver)
93 return mon_q
94
95
96def deploy_sensors_stage(cfg, ctx, nodes=None, undeploy=True):
97 if 'sensors' not in cfg:
98 return
99
100 cfg = cfg.get('sensors')
101
102 if nodes is None:
103 nodes = ctx.nodes
104
koder aka kdanilov168f6092015-04-19 02:33:38 +0300105 monitored_nodes, sensors_configs = get_sensors_config_for_nodes(cfg,
106 nodes)
107
108 if len(monitored_nodes) == 0:
109 logger.info("Nothing to monitor, no sensors would be installed")
110 return
111
112 if ctx.sensors_mon_q is None:
koder aka kdanilove87ae652015-04-20 02:14:35 +0300113 logger.info("Start sensors data receiving thread")
koder aka kdanilov168f6092015-04-19 02:33:38 +0300114 ctx.sensors_mon_q = start_sensor_process_thread(ctx, cfg,
115 sensors_configs)
116
117 if undeploy:
118 def remove_sensors_stage(cfg, ctx):
119 stop_and_remove_sensors(sensors_configs)
120 ctx.clear_calls_stack.append(remove_sensors_stage)
121
koder aka kdanilove87ae652015-04-20 02:14:35 +0300122 logger.info("Deploing new sensors on {0} node(s)".format(len(nodes)))
koder aka kdanilov168f6092015-04-19 02:33:38 +0300123 deploy_and_start_sensors(sensors_configs)
124 wait_for_new_sensors_data(ctx, monitored_nodes)
125
126
127def wait_for_new_sensors_data(ctx, monitored_nodes):
128 MAX_WAIT_FOR_SENSORS = 10
129 etime = time.time() + MAX_WAIT_FOR_SENSORS
130
131 msg = "Waiting at most {0}s till all {1} nodes starts report sensors data"
132 nodes_ids = set(node.get_conn_id() for node in monitored_nodes)
133 logger.debug(msg.format(MAX_WAIT_FOR_SENSORS, len(nodes_ids)))
134
135 # wait till all nodes start sending data
136 while len(nodes_ids) != 0:
137 tleft = etime - time.time()
138 try:
139 source_id = ctx.sensors_mon_q.get(True, tleft)[2]
140 except Queue.Empty:
141 msg = "Node {0} not sending any sensor data in {1}s"
142 msg = msg.format(", ".join(nodes_ids), MAX_WAIT_FOR_SENSORS)
143 raise RuntimeError(msg)
144
145 if source_id not in nodes_ids:
146 msg = "Receive sensors from extra node: {0}".format(source_id)
147 logger.warning(msg)
148
149 nodes_ids.remove(source_id)