blob: 1dac2e5050017681744fed166e8970892a9aeb61 [file] [log] [blame]
gstepanov023c1e42015-04-08 15:50:19 +03001import os
2import pickle
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -08003import sys
koder aka kdanilovda45e882015-04-06 02:24:42 +03004import json
koder aka kdanilov2c473092015-03-29 17:12:13 +03005import Queue
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -08006import pprint
koder aka kdanilove21d7472015-02-14 19:02:04 -08007import logging
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -08008import argparse
koder aka kdanilov2c473092015-03-29 17:12:13 +03009import threading
10import collections
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -080011
koder aka kdanilov2c473092015-03-29 17:12:13 +030012from concurrent.futures import ThreadPoolExecutor
13
14import utils
koder aka kdanilove06762a2015-03-22 23:32:09 +020015import ssh_utils
koder aka kdanilovda45e882015-04-06 02:24:42 +030016import start_vms
koder aka kdanilove06762a2015-03-22 23:32:09 +020017from nodes import discover
koder aka kdanilov2c473092015-03-29 17:12:13 +030018from nodes.node import Node
gstepanovcd256d62015-04-07 17:47:32 +030019from config import cfg_dict, parse_config
koder aka kdanilovda45e882015-04-06 02:24:42 +030020from tests.itest import IOPerfTest, PgBenchTest
koder aka kdanilov2c473092015-03-29 17:12:13 +030021from sensors.api import start_monitoring
22
23
koder aka kdanilove21d7472015-02-14 19:02:04 -080024logger = logging.getLogger("io-perf-tool")
koder aka kdanilove21d7472015-02-14 19:02:04 -080025
26
koder aka kdanilov2c473092015-03-29 17:12:13 +030027def setup_logger(logger, level=logging.DEBUG):
28 logger.setLevel(level)
29 ch = logging.StreamHandler()
30 ch.setLevel(level)
31 logger.addHandler(ch)
Yulia Portnova7ddfa732015-02-24 17:32:58 +020032
koder aka kdanilov2c473092015-03-29 17:12:13 +030033 log_format = '%(asctime)s - %(levelname)-6s - %(name)s - %(message)s'
34 formatter = logging.Formatter(log_format,
35 "%H:%M:%S")
36 ch.setFormatter(formatter)
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -080037
38
Yulia Portnova7ddfa732015-02-24 17:32:58 +020039def format_result(res, formatter):
koder aka kdanilove21d7472015-02-14 19:02:04 -080040 data = "\n{0}\n".format("=" * 80)
41 data += pprint.pformat(res) + "\n"
42 data += "{0}\n".format("=" * 80)
koder aka kdanilovfe056622015-02-19 08:46:15 -080043 templ = "{0}\n\n====> {1}\n\n{2}\n\n"
Yulia Portnova7ddfa732015-02-24 17:32:58 +020044 return templ.format(data, formatter(res), "=" * 80)
koder aka kdanilove21d7472015-02-14 19:02:04 -080045
46
koder aka kdanilov5d589b42015-03-26 12:25:51 +020047def connect_one(node):
48 try:
koder aka kdanilov2c473092015-03-29 17:12:13 +030049 ssh_pref = "ssh://"
50 if node.conn_url.startswith(ssh_pref):
51 url = node.conn_url[len(ssh_pref):]
52 node.connection = ssh_utils.connect(url)
53 else:
54 raise ValueError("Unknown url type {0}".format(node.conn_url))
koder aka kdanilov3a6633e2015-03-26 18:20:00 +020055 except Exception:
koder aka kdanilov2c473092015-03-29 17:12:13 +030056 logger.exception("During connect to {0}".format(node))
koder aka kdanilov5d589b42015-03-26 12:25:51 +020057
58
59def connect_all(nodes):
koder aka kdanilov2c473092015-03-29 17:12:13 +030060 logger.info("Connecting to nodes")
61 with ThreadPoolExecutor(32) as pool:
62 list(pool.map(connect_one, nodes))
koder aka kdanilovda45e882015-04-06 02:24:42 +030063 logger.info("All nodes connected successfully")
koder aka kdanilov2c473092015-03-29 17:12:13 +030064
65
66def save_sensors_data(q):
67 logger.info("Start receiving sensors data")
koder aka kdanilovda45e882015-04-06 02:24:42 +030068 sensor_data = []
koder aka kdanilov2c473092015-03-29 17:12:13 +030069 while True:
70 val = q.get()
71 if val is None:
koder aka kdanilovda45e882015-04-06 02:24:42 +030072 print sensor_data
73 q.put(sensor_data)
koder aka kdanilov2c473092015-03-29 17:12:13 +030074 break
koder aka kdanilovda45e882015-04-06 02:24:42 +030075 sensor_data.append(val)
koder aka kdanilov2c473092015-03-29 17:12:13 +030076 logger.info("Sensors thread exits")
77
78
79def test_thread(test, node, barrier):
80 try:
81 logger.debug("Run preparation for {0}".format(node.conn_url))
82 test.pre_run(node.connection)
83 logger.debug("Run test for {0}".format(node.conn_url))
84 test.run(node.connection, barrier)
85 except:
86 logger.exception("In test {0} for node {1}".format(test, node))
87
88
89def run_tests(config, nodes):
90 tool_type_mapper = {
91 "io": IOPerfTest,
92 "pgbench": PgBenchTest,
93 }
94
95 test_nodes = [node for node in nodes
96 if 'testnode' in node.roles]
97
98 res_q = Queue.Queue()
99
koder aka kdanilovda45e882015-04-06 02:24:42 +0300100 for test in config['tests']:
gstepanov023c1e42015-04-08 15:50:19 +0300101 for test in config['tests'][test]['internal_tests']:
102 for name, params in test.items():
103 logger.info("Starting {0} tests".format(name))
koder aka kdanilov2c473092015-03-29 17:12:13 +0300104
gstepanov023c1e42015-04-08 15:50:19 +0300105 threads = []
106 barrier = utils.Barrier(len(test_nodes))
107 for node in test_nodes:
108 msg = "Starting {0} test on {1} node"
109 logger.debug(msg.format(name, node.conn_url))
110 test = tool_type_mapper[name](params, res_q.put)
111 th = threading.Thread(None, test_thread, None,
112 (test, node, barrier))
113 threads.append(th)
114 th.daemon = True
115 th.start()
koder aka kdanilov2c473092015-03-29 17:12:13 +0300116
gstepanov023c1e42015-04-08 15:50:19 +0300117 for th in threads:
118 th.join()
koder aka kdanilov2c473092015-03-29 17:12:13 +0300119
gstepanov023c1e42015-04-08 15:50:19 +0300120 results = []
121 while not res_q.empty():
122 results.append(res_q.get())
123 # logger.info("Get test result {0!r}".format(results[-1]))
124 yield name, results
koder aka kdanilov2c473092015-03-29 17:12:13 +0300125
126
127def parse_args(argv):
128 parser = argparse.ArgumentParser(
129 description="Run disk io performance test")
130
131 parser.add_argument("-l", dest='extra_logs',
132 action='store_true', default=False,
133 help="print some extra log info")
134
gstepanov4861d712015-04-09 13:28:02 +0300135 parser.add_argument("-b", '--build_description',
136 type=str, default="Build info")
gstepanovaffcdb12015-04-07 17:18:29 +0300137 parser.add_argument("-i", '--build_id', type=str, default="id")
138 parser.add_argument("-t", '--build_type', type=str, default="GA")
139 parser.add_argument("-u", '--username', type=str, default="admin")
koder aka kdanilovda45e882015-04-06 02:24:42 +0300140 parser.add_argument("-o", '--output-dest', nargs="*")
141 parser.add_argument("config_file", nargs="?", default="config.yaml")
koder aka kdanilov2c473092015-03-29 17:12:13 +0300142
143 return parser.parse_args(argv[1:])
144
145
koder aka kdanilovda45e882015-04-06 02:24:42 +0300146def log_nodes_statistic(_, ctx):
147 nodes = ctx.nodes
koder aka kdanilov2c473092015-03-29 17:12:13 +0300148 logger.info("Found {0} nodes total".format(len(nodes)))
149 per_role = collections.defaultdict(lambda: 0)
150 for node in nodes:
151 for role in node.roles:
152 per_role[role] += 1
153
154 for role, count in sorted(per_role.items()):
155 logger.debug("Found {0} nodes with role {1}".format(count, role))
156
157
158def log_sensors_config(cfg):
koder aka kdanilov5d589b42015-03-26 12:25:51 +0200159 pass
160
161
koder aka kdanilovda45e882015-04-06 02:24:42 +0300162def connect_stage(cfg, ctx):
163 ctx.clear_calls_stack.append(disconnect_stage)
164 connect_all(ctx.nodes)
165
166
167def discover_stage(cfg, ctx):
168 if 'discover' in cfg:
169 discover_objs = [i.strip() for i in cfg['discover'].strip().split(",")]
170 ctx.nodes.extend(discover.discover(discover_objs, cfg['clouds']))
171
172 for url, roles in cfg.get('explicit_nodes', {}).items():
173 ctx.nodes.append(Node(url, roles.split(",")))
174
175
176def deploy_sensors_stage(cfg_dict, ctx):
177 ctx.clear_calls_stack.append(remove_sensors_stage)
178 if 'sensors' not in cfg_dict:
179 return
180
181 cfg = cfg_dict.get('sensors')
182 sens_cfg = []
183
184 for role, sensors_str in cfg["roles_mapping"].items():
185 sensors = [sens.strip() for sens in sensors_str.split(",")]
186
187 collect_cfg = dict((sensor, {}) for sensor in sensors)
188
189 for node in ctx.nodes:
190 if role in node.roles:
191 sens_cfg.append((node.connection, collect_cfg))
192
193 log_sensors_config(sens_cfg)
194
195 ctx.sensor_cm = start_monitoring(cfg["receiver_uri"], None,
196 connected_config=sens_cfg)
197
198 ctx.sensors_control_queue = ctx.sensor_cm.__enter__()
199
200 th = threading.Thread(None, save_sensors_data, None,
201 (ctx.sensors_control_queue,))
202 th.daemon = True
203 th.start()
204 ctx.sensor_listen_thread = th
205
206
207def remove_sensors_stage(cfg, ctx):
208 ctx.sensors_control_queue.put(None)
209 ctx.sensor_listen_thread.join()
210 ctx.sensor_data = ctx.sensors_control_queue.get()
211
212
gstepanov023c1e42015-04-08 15:50:19 +0300213def run_all_test(cfg, ctx, store_nodes):
koder aka kdanilovda45e882015-04-06 02:24:42 +0300214 ctx.results = []
215
gstepanov023c1e42015-04-08 15:50:19 +0300216 if 'start_test_nodes' in cfg['tests']:
217 params = cfg['tests']['start_test_nodes']['openstack']
218 for new_node in start_vms.launch_vms(params):
219 new_node.roles.append('testnode')
220 ctx.nodes.append(new_node)
221
koder aka kdanilovda45e882015-04-06 02:24:42 +0300222 if 'tests' in cfg:
gstepanov023c1e42015-04-08 15:50:19 +0300223 store_nodes(ctx.nodes)
koder aka kdanilovda45e882015-04-06 02:24:42 +0300224 ctx.results.extend(run_tests(cfg_dict, ctx.nodes))
225
gstepanov023c1e42015-04-08 15:50:19 +0300226
227def shut_down_vms(cfg, ctx):
228 with open('vm_journal.log') as f:
229 data = str(f.read())
230 nodes = pickle.loads(data)
231
232 for node in nodes:
233 logger.info("Node " + str(node) + " has been loaded")
234
235 logger.info("Removing nodes")
236 start_vms.clear_nodes()
237 logger.info("Nodes has been removed")
238
239
240def store_nodes(nodes):
241 with open('vm_journal.log', 'w+') as f:
242 f.write(pickle.dumps([nodes]))
243 for node in nodes:
244 logger.info("Node " + str(node) + " has been stored")
245
246
247def clear_enviroment(cfg, ctx):
248 if os.path.exists('vm_journal.log'):
249 shut_down_vms(cfg, ctx)
250 os.remove('vm_journal.log')
251
252
253def run_tests_stage(cfg, ctx):
254 # clear nodes that possible were created on previous test running
255 clear_enviroment(cfg, ctx)
256 ctx.clear_calls_stack.append(shut_down_vms)
257 run_all_test(cfg, ctx, store_nodes)
koder aka kdanilovda45e882015-04-06 02:24:42 +0300258
259
260def disconnect_stage(cfg, ctx):
261 for node in ctx.nodes:
262 if node.connection is not None:
263 node.connection.close()
264
265
266def report_stage(cfg, ctx):
267 output_dest = cfg.get('output_dest')
268 if output_dest is not None:
269 with open(output_dest, "w") as fd:
270 data = {"sensor_data": ctx.sensor_data,
271 "results": ctx.results}
272 fd.write(json.dumps(data))
273 else:
274 print "=" * 20 + " RESULTS " + "=" * 20
275 pprint.pprint(ctx.results)
276 print "=" * 60
277
278
279def complete_log_nodes_statistic(cfg, ctx):
280 nodes = ctx.nodes
281 for node in nodes:
282 logger.debug(str(node))
283
284
285class Context(object):
286 def __init__(self):
gstepanovaffcdb12015-04-07 17:18:29 +0300287 self.build_meta = {}
koder aka kdanilovda45e882015-04-06 02:24:42 +0300288 self.nodes = []
289 self.clear_calls_stack = []
290
291
gstepanovcd256d62015-04-07 17:47:32 +0300292def load_config(path):
293 global cfg_dict
294 cfg_dict = parse_config(path)
295
296
koder aka kdanilov3f356262015-02-13 08:06:14 -0800297def main(argv):
koder aka kdanilove06762a2015-03-22 23:32:09 +0200298 opts = parse_args(argv)
koder aka kdanilov2c473092015-03-29 17:12:13 +0300299
300 level = logging.DEBUG if opts.extra_logs else logging.WARNING
301 setup_logger(logger, level)
302
koder aka kdanilovda45e882015-04-06 02:24:42 +0300303 stages = [
304 discover_stage,
305 connect_stage,
306 complete_log_nodes_statistic,
307 # deploy_sensors_stage,
308 run_tests_stage,
309 report_stage
310 ]
koder aka kdanilov2c473092015-03-29 17:12:13 +0300311
gstepanovcd256d62015-04-07 17:47:32 +0300312 load_config(opts.config_file)
313
koder aka kdanilovda45e882015-04-06 02:24:42 +0300314 ctx = Context()
gstepanovaffcdb12015-04-07 17:18:29 +0300315 ctx.build_meta['build_id'] = opts.build_id
316 ctx.build_meta['build_descrption'] = opts.build_description
317 ctx.build_meta['build_type'] = opts.build_type
318 ctx.build_meta['username'] = opts.username
gstepanov023c1e42015-04-08 15:50:19 +0300319 logger.setLevel(logging.INFO)
320 logger.addHandler(logging.FileHandler('log.txt'))
koder aka kdanilovda45e882015-04-06 02:24:42 +0300321 try:
322 for stage in stages:
323 logger.info("Start {0.__name__} stage".format(stage))
gstepanov023c1e42015-04-08 15:50:19 +0300324 print "Start {0.__name__} stage".format(stage)
koder aka kdanilovda45e882015-04-06 02:24:42 +0300325 stage(cfg_dict, ctx)
326 finally:
327 exc, cls, tb = sys.exc_info()
328 for stage in ctx.clear_calls_stack[::-1]:
329 try:
330 logger.info("Start {0.__name__} stage".format(stage))
331 stage(cfg_dict, ctx)
332 except:
333 pass
koder aka kdanilov2c473092015-03-29 17:12:13 +0300334
koder aka kdanilovda45e882015-04-06 02:24:42 +0300335 if exc is not None:
336 raise exc, cls, tb
koder aka kdanilov2c473092015-03-29 17:12:13 +0300337
koder aka kdanilove06762a2015-03-22 23:32:09 +0200338 return 0
koder aka kdanilov3f356262015-02-13 08:06:14 -0800339
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -0800340
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -0800341if __name__ == '__main__':
koder aka kdanilove06762a2015-03-22 23:32:09 +0200342 exit(main(sys.argv))