blob: 992c7eb3771684d52f8ee203161658d162a47dcd [file] [log] [blame]
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -08001import sys
koder aka kdanilovda45e882015-04-06 02:24:42 +03002import json
koder aka kdanilov2c473092015-03-29 17:12:13 +03003import Queue
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -08004import pprint
koder aka kdanilove21d7472015-02-14 19:02:04 -08005import logging
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -08006import argparse
koder aka kdanilov2c473092015-03-29 17:12:13 +03007import threading
8import collections
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -08009
koder aka kdanilov2c473092015-03-29 17:12:13 +030010from concurrent.futures import ThreadPoolExecutor
11
12import utils
koder aka kdanilove06762a2015-03-22 23:32:09 +020013import ssh_utils
koder aka kdanilovda45e882015-04-06 02:24:42 +030014import start_vms
koder aka kdanilove06762a2015-03-22 23:32:09 +020015from nodes import discover
koder aka kdanilov2c473092015-03-29 17:12:13 +030016from nodes.node import Node
Yulia Portnova0e64ea22015-03-20 17:27:22 +020017from config import cfg_dict
koder aka kdanilovda45e882015-04-06 02:24:42 +030018from tests.itest import IOPerfTest, PgBenchTest
Yulia Portnova7ddfa732015-02-24 17:32:58 +020019
koder aka kdanilov2c473092015-03-29 17:12:13 +030020from sensors.api import start_monitoring
21
22
koder aka kdanilove21d7472015-02-14 19:02:04 -080023logger = logging.getLogger("io-perf-tool")
koder aka kdanilove21d7472015-02-14 19:02:04 -080024
25
koder aka kdanilov2c473092015-03-29 17:12:13 +030026def setup_logger(logger, level=logging.DEBUG):
27 logger.setLevel(level)
28 ch = logging.StreamHandler()
29 ch.setLevel(level)
30 logger.addHandler(ch)
Yulia Portnova7ddfa732015-02-24 17:32:58 +020031
koder aka kdanilov2c473092015-03-29 17:12:13 +030032 log_format = '%(asctime)s - %(levelname)-6s - %(name)s - %(message)s'
33 formatter = logging.Formatter(log_format,
34 "%H:%M:%S")
35 ch.setFormatter(formatter)
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -080036
37
Yulia Portnova7ddfa732015-02-24 17:32:58 +020038def format_result(res, formatter):
koder aka kdanilove21d7472015-02-14 19:02:04 -080039 data = "\n{0}\n".format("=" * 80)
40 data += pprint.pformat(res) + "\n"
41 data += "{0}\n".format("=" * 80)
koder aka kdanilovfe056622015-02-19 08:46:15 -080042 templ = "{0}\n\n====> {1}\n\n{2}\n\n"
Yulia Portnova7ddfa732015-02-24 17:32:58 +020043 return templ.format(data, formatter(res), "=" * 80)
koder aka kdanilove21d7472015-02-14 19:02:04 -080044
45
koder aka kdanilov5d589b42015-03-26 12:25:51 +020046def connect_one(node):
47 try:
koder aka kdanilov2c473092015-03-29 17:12:13 +030048 ssh_pref = "ssh://"
49 if node.conn_url.startswith(ssh_pref):
50 url = node.conn_url[len(ssh_pref):]
51 node.connection = ssh_utils.connect(url)
52 else:
53 raise ValueError("Unknown url type {0}".format(node.conn_url))
koder aka kdanilov3a6633e2015-03-26 18:20:00 +020054 except Exception:
koder aka kdanilov2c473092015-03-29 17:12:13 +030055 logger.exception("During connect to {0}".format(node))
koder aka kdanilov5d589b42015-03-26 12:25:51 +020056
57
58def connect_all(nodes):
koder aka kdanilov2c473092015-03-29 17:12:13 +030059 logger.info("Connecting to nodes")
60 with ThreadPoolExecutor(32) as pool:
61 list(pool.map(connect_one, nodes))
koder aka kdanilovda45e882015-04-06 02:24:42 +030062 logger.info("All nodes connected successfully")
koder aka kdanilov2c473092015-03-29 17:12:13 +030063
64
65def save_sensors_data(q):
66 logger.info("Start receiving sensors data")
koder aka kdanilovda45e882015-04-06 02:24:42 +030067 sensor_data = []
koder aka kdanilov2c473092015-03-29 17:12:13 +030068 while True:
69 val = q.get()
70 if val is None:
koder aka kdanilovda45e882015-04-06 02:24:42 +030071 print sensor_data
72 q.put(sensor_data)
koder aka kdanilov2c473092015-03-29 17:12:13 +030073 break
koder aka kdanilovda45e882015-04-06 02:24:42 +030074 sensor_data.append(val)
koder aka kdanilov2c473092015-03-29 17:12:13 +030075 logger.info("Sensors thread exits")
76
77
78def test_thread(test, node, barrier):
79 try:
80 logger.debug("Run preparation for {0}".format(node.conn_url))
81 test.pre_run(node.connection)
82 logger.debug("Run test for {0}".format(node.conn_url))
83 test.run(node.connection, barrier)
84 except:
85 logger.exception("In test {0} for node {1}".format(test, node))
86
87
88def run_tests(config, nodes):
89 tool_type_mapper = {
90 "io": IOPerfTest,
91 "pgbench": PgBenchTest,
92 }
93
94 test_nodes = [node for node in nodes
95 if 'testnode' in node.roles]
96
97 res_q = Queue.Queue()
98
koder aka kdanilovda45e882015-04-06 02:24:42 +030099 for test in config['tests']:
100 for name, params in test.items():
101 logger.info("Starting {0} tests".format(name))
koder aka kdanilov2c473092015-03-29 17:12:13 +0300102
koder aka kdanilovda45e882015-04-06 02:24:42 +0300103 threads = []
104 barrier = utils.Barrier(len(test_nodes))
105 for node in test_nodes:
106 msg = "Starting {0} test on {1} node"
107 logger.debug(msg.format(name, node.conn_url))
108 test = tool_type_mapper[name](params, res_q.put)
109 th = threading.Thread(None, test_thread, None,
110 (test, node, barrier))
111 threads.append(th)
112 th.daemon = True
113 th.start()
koder aka kdanilov2c473092015-03-29 17:12:13 +0300114
koder aka kdanilovda45e882015-04-06 02:24:42 +0300115 for th in threads:
116 th.join()
koder aka kdanilov2c473092015-03-29 17:12:13 +0300117
koder aka kdanilovda45e882015-04-06 02:24:42 +0300118 results = []
119 while not res_q.empty():
120 results.append(res_q.get())
121 # logger.info("Get test result {0!r}".format(results[-1]))
122 yield name, results
koder aka kdanilov2c473092015-03-29 17:12:13 +0300123
124
125def parse_args(argv):
126 parser = argparse.ArgumentParser(
127 description="Run disk io performance test")
128
129 parser.add_argument("-l", dest='extra_logs',
130 action='store_true', default=False,
131 help="print some extra log info")
132
koder aka kdanilovda45e882015-04-06 02:24:42 +0300133 parser.add_argument("-o", '--output-dest', nargs="*")
134 parser.add_argument("config_file", nargs="?", default="config.yaml")
koder aka kdanilov2c473092015-03-29 17:12:13 +0300135
136 return parser.parse_args(argv[1:])
137
138
koder aka kdanilovda45e882015-04-06 02:24:42 +0300139def log_nodes_statistic(_, ctx):
140 nodes = ctx.nodes
koder aka kdanilov2c473092015-03-29 17:12:13 +0300141 logger.info("Found {0} nodes total".format(len(nodes)))
142 per_role = collections.defaultdict(lambda: 0)
143 for node in nodes:
144 for role in node.roles:
145 per_role[role] += 1
146
147 for role, count in sorted(per_role.items()):
148 logger.debug("Found {0} nodes with role {1}".format(count, role))
149
150
151def log_sensors_config(cfg):
koder aka kdanilov5d589b42015-03-26 12:25:51 +0200152 pass
153
154
koder aka kdanilovda45e882015-04-06 02:24:42 +0300155def connect_stage(cfg, ctx):
156 ctx.clear_calls_stack.append(disconnect_stage)
157 connect_all(ctx.nodes)
158
159
160def discover_stage(cfg, ctx):
161 if 'discover' in cfg:
162 discover_objs = [i.strip() for i in cfg['discover'].strip().split(",")]
163 ctx.nodes.extend(discover.discover(discover_objs, cfg['clouds']))
164
165 for url, roles in cfg.get('explicit_nodes', {}).items():
166 ctx.nodes.append(Node(url, roles.split(",")))
167
168
169def deploy_sensors_stage(cfg_dict, ctx):
170 ctx.clear_calls_stack.append(remove_sensors_stage)
171 if 'sensors' not in cfg_dict:
172 return
173
174 cfg = cfg_dict.get('sensors')
175 sens_cfg = []
176
177 for role, sensors_str in cfg["roles_mapping"].items():
178 sensors = [sens.strip() for sens in sensors_str.split(",")]
179
180 collect_cfg = dict((sensor, {}) for sensor in sensors)
181
182 for node in ctx.nodes:
183 if role in node.roles:
184 sens_cfg.append((node.connection, collect_cfg))
185
186 log_sensors_config(sens_cfg)
187
188 ctx.sensor_cm = start_monitoring(cfg["receiver_uri"], None,
189 connected_config=sens_cfg)
190
191 ctx.sensors_control_queue = ctx.sensor_cm.__enter__()
192
193 th = threading.Thread(None, save_sensors_data, None,
194 (ctx.sensors_control_queue,))
195 th.daemon = True
196 th.start()
197 ctx.sensor_listen_thread = th
198
199
200def remove_sensors_stage(cfg, ctx):
201 ctx.sensors_control_queue.put(None)
202 ctx.sensor_listen_thread.join()
203 ctx.sensor_data = ctx.sensors_control_queue.get()
204
205
206def run_tests_stage(cfg, ctx):
207 ctx.results = []
208
209 if 'tests' in cfg:
210 ctx.results.extend(run_tests(cfg_dict, ctx.nodes))
211
212 # if 'start_test_nodes' in opts.stages:
213 # params = cfg_dict['start_test_nodes']['openstack']
214 # for new_node in start_vms.launch_vms(params):
215 # new_node.roles.append('testnode')
216 # nodes.append(new_node)
217
218
219def disconnect_stage(cfg, ctx):
220 for node in ctx.nodes:
221 if node.connection is not None:
222 node.connection.close()
223
224
225def report_stage(cfg, ctx):
226 output_dest = cfg.get('output_dest')
227 if output_dest is not None:
228 with open(output_dest, "w") as fd:
229 data = {"sensor_data": ctx.sensor_data,
230 "results": ctx.results}
231 fd.write(json.dumps(data))
232 else:
233 print "=" * 20 + " RESULTS " + "=" * 20
234 pprint.pprint(ctx.results)
235 print "=" * 60
236
237
238def complete_log_nodes_statistic(cfg, ctx):
239 nodes = ctx.nodes
240 for node in nodes:
241 logger.debug(str(node))
242
243
244class Context(object):
245 def __init__(self):
246 self.nodes = []
247 self.clear_calls_stack = []
248
249
koder aka kdanilov3f356262015-02-13 08:06:14 -0800250def main(argv):
koder aka kdanilove06762a2015-03-22 23:32:09 +0200251 opts = parse_args(argv)
koder aka kdanilov2c473092015-03-29 17:12:13 +0300252
253 level = logging.DEBUG if opts.extra_logs else logging.WARNING
254 setup_logger(logger, level)
255
koder aka kdanilovda45e882015-04-06 02:24:42 +0300256 stages = [
257 discover_stage,
258 connect_stage,
259 complete_log_nodes_statistic,
260 # deploy_sensors_stage,
261 run_tests_stage,
262 report_stage
263 ]
koder aka kdanilov2c473092015-03-29 17:12:13 +0300264
koder aka kdanilovda45e882015-04-06 02:24:42 +0300265 ctx = Context()
266 try:
267 for stage in stages:
268 logger.info("Start {0.__name__} stage".format(stage))
269 stage(cfg_dict, ctx)
270 finally:
271 exc, cls, tb = sys.exc_info()
272 for stage in ctx.clear_calls_stack[::-1]:
273 try:
274 logger.info("Start {0.__name__} stage".format(stage))
275 stage(cfg_dict, ctx)
276 except:
277 pass
koder aka kdanilov2c473092015-03-29 17:12:13 +0300278
koder aka kdanilovda45e882015-04-06 02:24:42 +0300279 if exc is not None:
280 raise exc, cls, tb
koder aka kdanilov2c473092015-03-29 17:12:13 +0300281
koder aka kdanilove06762a2015-03-22 23:32:09 +0200282 return 0
koder aka kdanilov3f356262015-02-13 08:06:14 -0800283
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -0800284
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -0800285if __name__ == '__main__':
koder aka kdanilove06762a2015-03-22 23:32:09 +0200286 exit(main(sys.argv))