blob: 341119ec4a7ce06b4c710020547bfeb9ddf21df5 [file] [log] [blame]
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -08001import sys
koder aka kdanilovda45e882015-04-06 02:24:42 +03002import json
koder aka kdanilov2c473092015-03-29 17:12:13 +03003import Queue
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -08004import pprint
koder aka kdanilove21d7472015-02-14 19:02:04 -08005import logging
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -08006import argparse
koder aka kdanilov2c473092015-03-29 17:12:13 +03007import threading
8import collections
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -08009
koder aka kdanilov2c473092015-03-29 17:12:13 +030010from concurrent.futures import ThreadPoolExecutor
11
12import utils
koder aka kdanilove06762a2015-03-22 23:32:09 +020013import ssh_utils
koder aka kdanilovda45e882015-04-06 02:24:42 +030014import start_vms
koder aka kdanilove06762a2015-03-22 23:32:09 +020015from nodes import discover
koder aka kdanilov2c473092015-03-29 17:12:13 +030016from nodes.node import Node
Yulia Portnova0e64ea22015-03-20 17:27:22 +020017from config import cfg_dict
koder aka kdanilovda45e882015-04-06 02:24:42 +030018from tests.itest import IOPerfTest, PgBenchTest
Yulia Portnova7ddfa732015-02-24 17:32:58 +020019
koder aka kdanilov2c473092015-03-29 17:12:13 +030020from sensors.api import start_monitoring
21
22
koder aka kdanilove21d7472015-02-14 19:02:04 -080023logger = logging.getLogger("io-perf-tool")
koder aka kdanilove21d7472015-02-14 19:02:04 -080024
25
koder aka kdanilov2c473092015-03-29 17:12:13 +030026def setup_logger(logger, level=logging.DEBUG):
27 logger.setLevel(level)
28 ch = logging.StreamHandler()
29 ch.setLevel(level)
30 logger.addHandler(ch)
Yulia Portnova7ddfa732015-02-24 17:32:58 +020031
koder aka kdanilov2c473092015-03-29 17:12:13 +030032 log_format = '%(asctime)s - %(levelname)-6s - %(name)s - %(message)s'
33 formatter = logging.Formatter(log_format,
34 "%H:%M:%S")
35 ch.setFormatter(formatter)
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -080036
37
Yulia Portnova7ddfa732015-02-24 17:32:58 +020038def format_result(res, formatter):
koder aka kdanilove21d7472015-02-14 19:02:04 -080039 data = "\n{0}\n".format("=" * 80)
40 data += pprint.pformat(res) + "\n"
41 data += "{0}\n".format("=" * 80)
koder aka kdanilovfe056622015-02-19 08:46:15 -080042 templ = "{0}\n\n====> {1}\n\n{2}\n\n"
Yulia Portnova7ddfa732015-02-24 17:32:58 +020043 return templ.format(data, formatter(res), "=" * 80)
koder aka kdanilove21d7472015-02-14 19:02:04 -080044
45
koder aka kdanilov5d589b42015-03-26 12:25:51 +020046def connect_one(node):
47 try:
koder aka kdanilov2c473092015-03-29 17:12:13 +030048 ssh_pref = "ssh://"
49 if node.conn_url.startswith(ssh_pref):
50 url = node.conn_url[len(ssh_pref):]
51 node.connection = ssh_utils.connect(url)
52 else:
53 raise ValueError("Unknown url type {0}".format(node.conn_url))
koder aka kdanilov3a6633e2015-03-26 18:20:00 +020054 except Exception:
koder aka kdanilov2c473092015-03-29 17:12:13 +030055 logger.exception("During connect to {0}".format(node))
koder aka kdanilov5d589b42015-03-26 12:25:51 +020056
57
58def connect_all(nodes):
koder aka kdanilov2c473092015-03-29 17:12:13 +030059 logger.info("Connecting to nodes")
60 with ThreadPoolExecutor(32) as pool:
61 list(pool.map(connect_one, nodes))
koder aka kdanilovda45e882015-04-06 02:24:42 +030062 logger.info("All nodes connected successfully")
koder aka kdanilov2c473092015-03-29 17:12:13 +030063
64
65def save_sensors_data(q):
66 logger.info("Start receiving sensors data")
koder aka kdanilovda45e882015-04-06 02:24:42 +030067 sensor_data = []
koder aka kdanilov2c473092015-03-29 17:12:13 +030068 while True:
69 val = q.get()
70 if val is None:
koder aka kdanilovda45e882015-04-06 02:24:42 +030071 print sensor_data
72 q.put(sensor_data)
koder aka kdanilov2c473092015-03-29 17:12:13 +030073 break
koder aka kdanilovda45e882015-04-06 02:24:42 +030074 sensor_data.append(val)
koder aka kdanilov2c473092015-03-29 17:12:13 +030075 logger.info("Sensors thread exits")
76
77
78def test_thread(test, node, barrier):
79 try:
80 logger.debug("Run preparation for {0}".format(node.conn_url))
81 test.pre_run(node.connection)
82 logger.debug("Run test for {0}".format(node.conn_url))
83 test.run(node.connection, barrier)
84 except:
85 logger.exception("In test {0} for node {1}".format(test, node))
86
87
88def run_tests(config, nodes):
89 tool_type_mapper = {
90 "io": IOPerfTest,
91 "pgbench": PgBenchTest,
92 }
93
94 test_nodes = [node for node in nodes
95 if 'testnode' in node.roles]
96
97 res_q = Queue.Queue()
98
koder aka kdanilovda45e882015-04-06 02:24:42 +030099 for test in config['tests']:
100 for name, params in test.items():
101 logger.info("Starting {0} tests".format(name))
koder aka kdanilov2c473092015-03-29 17:12:13 +0300102
koder aka kdanilovda45e882015-04-06 02:24:42 +0300103 threads = []
104 barrier = utils.Barrier(len(test_nodes))
105 for node in test_nodes:
106 msg = "Starting {0} test on {1} node"
107 logger.debug(msg.format(name, node.conn_url))
108 test = tool_type_mapper[name](params, res_q.put)
109 th = threading.Thread(None, test_thread, None,
110 (test, node, barrier))
111 threads.append(th)
112 th.daemon = True
113 th.start()
koder aka kdanilov2c473092015-03-29 17:12:13 +0300114
koder aka kdanilovda45e882015-04-06 02:24:42 +0300115 for th in threads:
116 th.join()
koder aka kdanilov2c473092015-03-29 17:12:13 +0300117
koder aka kdanilovda45e882015-04-06 02:24:42 +0300118 results = []
119 while not res_q.empty():
120 results.append(res_q.get())
121 # logger.info("Get test result {0!r}".format(results[-1]))
122 yield name, results
koder aka kdanilov2c473092015-03-29 17:12:13 +0300123
124
125def parse_args(argv):
126 parser = argparse.ArgumentParser(
127 description="Run disk io performance test")
128
129 parser.add_argument("-l", dest='extra_logs',
130 action='store_true', default=False,
131 help="print some extra log info")
132
gstepanovaffcdb12015-04-07 17:18:29 +0300133 parser.add_argument("-b", '--build_description', type=str, default="Build info")
134 parser.add_argument("-i", '--build_id', type=str, default="id")
135 parser.add_argument("-t", '--build_type', type=str, default="GA")
136 parser.add_argument("-u", '--username', type=str, default="admin")
koder aka kdanilovda45e882015-04-06 02:24:42 +0300137 parser.add_argument("-o", '--output-dest', nargs="*")
138 parser.add_argument("config_file", nargs="?", default="config.yaml")
koder aka kdanilov2c473092015-03-29 17:12:13 +0300139
140 return parser.parse_args(argv[1:])
141
142
koder aka kdanilovda45e882015-04-06 02:24:42 +0300143def log_nodes_statistic(_, ctx):
144 nodes = ctx.nodes
koder aka kdanilov2c473092015-03-29 17:12:13 +0300145 logger.info("Found {0} nodes total".format(len(nodes)))
146 per_role = collections.defaultdict(lambda: 0)
147 for node in nodes:
148 for role in node.roles:
149 per_role[role] += 1
150
151 for role, count in sorted(per_role.items()):
152 logger.debug("Found {0} nodes with role {1}".format(count, role))
153
154
155def log_sensors_config(cfg):
koder aka kdanilov5d589b42015-03-26 12:25:51 +0200156 pass
157
158
koder aka kdanilovda45e882015-04-06 02:24:42 +0300159def connect_stage(cfg, ctx):
160 ctx.clear_calls_stack.append(disconnect_stage)
161 connect_all(ctx.nodes)
162
163
164def discover_stage(cfg, ctx):
165 if 'discover' in cfg:
166 discover_objs = [i.strip() for i in cfg['discover'].strip().split(",")]
167 ctx.nodes.extend(discover.discover(discover_objs, cfg['clouds']))
168
169 for url, roles in cfg.get('explicit_nodes', {}).items():
170 ctx.nodes.append(Node(url, roles.split(",")))
171
172
173def deploy_sensors_stage(cfg_dict, ctx):
174 ctx.clear_calls_stack.append(remove_sensors_stage)
175 if 'sensors' not in cfg_dict:
176 return
177
178 cfg = cfg_dict.get('sensors')
179 sens_cfg = []
180
181 for role, sensors_str in cfg["roles_mapping"].items():
182 sensors = [sens.strip() for sens in sensors_str.split(",")]
183
184 collect_cfg = dict((sensor, {}) for sensor in sensors)
185
186 for node in ctx.nodes:
187 if role in node.roles:
188 sens_cfg.append((node.connection, collect_cfg))
189
190 log_sensors_config(sens_cfg)
191
192 ctx.sensor_cm = start_monitoring(cfg["receiver_uri"], None,
193 connected_config=sens_cfg)
194
195 ctx.sensors_control_queue = ctx.sensor_cm.__enter__()
196
197 th = threading.Thread(None, save_sensors_data, None,
198 (ctx.sensors_control_queue,))
199 th.daemon = True
200 th.start()
201 ctx.sensor_listen_thread = th
202
203
204def remove_sensors_stage(cfg, ctx):
205 ctx.sensors_control_queue.put(None)
206 ctx.sensor_listen_thread.join()
207 ctx.sensor_data = ctx.sensors_control_queue.get()
208
209
210def run_tests_stage(cfg, ctx):
211 ctx.results = []
212
213 if 'tests' in cfg:
214 ctx.results.extend(run_tests(cfg_dict, ctx.nodes))
215
216 # if 'start_test_nodes' in opts.stages:
217 # params = cfg_dict['start_test_nodes']['openstack']
218 # for new_node in start_vms.launch_vms(params):
219 # new_node.roles.append('testnode')
220 # nodes.append(new_node)
221
222
223def disconnect_stage(cfg, ctx):
224 for node in ctx.nodes:
225 if node.connection is not None:
226 node.connection.close()
227
228
229def report_stage(cfg, ctx):
230 output_dest = cfg.get('output_dest')
231 if output_dest is not None:
232 with open(output_dest, "w") as fd:
233 data = {"sensor_data": ctx.sensor_data,
234 "results": ctx.results}
235 fd.write(json.dumps(data))
236 else:
237 print "=" * 20 + " RESULTS " + "=" * 20
238 pprint.pprint(ctx.results)
239 print "=" * 60
240
241
242def complete_log_nodes_statistic(cfg, ctx):
243 nodes = ctx.nodes
244 for node in nodes:
245 logger.debug(str(node))
246
247
248class Context(object):
249 def __init__(self):
gstepanovaffcdb12015-04-07 17:18:29 +0300250 self.build_meta = {}
koder aka kdanilovda45e882015-04-06 02:24:42 +0300251 self.nodes = []
252 self.clear_calls_stack = []
253
254
koder aka kdanilov3f356262015-02-13 08:06:14 -0800255def main(argv):
koder aka kdanilove06762a2015-03-22 23:32:09 +0200256 opts = parse_args(argv)
koder aka kdanilov2c473092015-03-29 17:12:13 +0300257
258 level = logging.DEBUG if opts.extra_logs else logging.WARNING
259 setup_logger(logger, level)
260
koder aka kdanilovda45e882015-04-06 02:24:42 +0300261 stages = [
262 discover_stage,
263 connect_stage,
264 complete_log_nodes_statistic,
265 # deploy_sensors_stage,
266 run_tests_stage,
267 report_stage
268 ]
koder aka kdanilov2c473092015-03-29 17:12:13 +0300269
koder aka kdanilovda45e882015-04-06 02:24:42 +0300270 ctx = Context()
gstepanovaffcdb12015-04-07 17:18:29 +0300271 ctx.build_meta['build_id'] = opts.build_id
272 ctx.build_meta['build_descrption'] = opts.build_description
273 ctx.build_meta['build_type'] = opts.build_type
274 ctx.build_meta['username'] = opts.username
275
koder aka kdanilovda45e882015-04-06 02:24:42 +0300276 try:
277 for stage in stages:
278 logger.info("Start {0.__name__} stage".format(stage))
279 stage(cfg_dict, ctx)
280 finally:
281 exc, cls, tb = sys.exc_info()
282 for stage in ctx.clear_calls_stack[::-1]:
283 try:
284 logger.info("Start {0.__name__} stage".format(stage))
285 stage(cfg_dict, ctx)
286 except:
287 pass
koder aka kdanilov2c473092015-03-29 17:12:13 +0300288
koder aka kdanilovda45e882015-04-06 02:24:42 +0300289 if exc is not None:
290 raise exc, cls, tb
koder aka kdanilov2c473092015-03-29 17:12:13 +0300291
koder aka kdanilove06762a2015-03-22 23:32:09 +0200292 return 0
koder aka kdanilov3f356262015-02-13 08:06:14 -0800293
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -0800294
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -0800295if __name__ == '__main__':
koder aka kdanilove06762a2015-03-22 23:32:09 +0200296 exit(main(sys.argv))