blob: d1a9f4fd99ec9c1fe9d98a2e4cb2943e269b3532 [file] [log] [blame]
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -08001import sys
koder aka kdanilov2c473092015-03-29 17:12:13 +03002import Queue
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -08003import pprint
koder aka kdanilove21d7472015-02-14 19:02:04 -08004import logging
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -08005import argparse
koder aka kdanilov2c473092015-03-29 17:12:13 +03006import threading
7import collections
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -08008
koder aka kdanilov2c473092015-03-29 17:12:13 +03009from concurrent.futures import ThreadPoolExecutor
10
11import utils
koder aka kdanilove06762a2015-03-22 23:32:09 +020012import ssh_utils
koder aka kdanilove06762a2015-03-22 23:32:09 +020013from nodes import discover
koder aka kdanilov2c473092015-03-29 17:12:13 +030014from nodes.node import Node
Yulia Portnova0e64ea22015-03-20 17:27:22 +020015from config import cfg_dict
koder aka kdanilove06762a2015-03-22 23:32:09 +020016from itest import IOPerfTest, PgBenchTest
Yulia Portnova7ddfa732015-02-24 17:32:58 +020017
koder aka kdanilov2c473092015-03-29 17:12:13 +030018from sensors.api import start_monitoring
19
20
koder aka kdanilove21d7472015-02-14 19:02:04 -080021logger = logging.getLogger("io-perf-tool")
koder aka kdanilove21d7472015-02-14 19:02:04 -080022
23
koder aka kdanilov2c473092015-03-29 17:12:13 +030024def setup_logger(logger, level=logging.DEBUG):
25 logger.setLevel(level)
26 ch = logging.StreamHandler()
27 ch.setLevel(level)
28 logger.addHandler(ch)
Yulia Portnova7ddfa732015-02-24 17:32:58 +020029
koder aka kdanilov2c473092015-03-29 17:12:13 +030030 log_format = '%(asctime)s - %(levelname)-6s - %(name)s - %(message)s'
31 formatter = logging.Formatter(log_format,
32 "%H:%M:%S")
33 ch.setFormatter(formatter)
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -080034
35
Yulia Portnova7ddfa732015-02-24 17:32:58 +020036def format_result(res, formatter):
koder aka kdanilove21d7472015-02-14 19:02:04 -080037 data = "\n{0}\n".format("=" * 80)
38 data += pprint.pformat(res) + "\n"
39 data += "{0}\n".format("=" * 80)
koder aka kdanilovfe056622015-02-19 08:46:15 -080040 templ = "{0}\n\n====> {1}\n\n{2}\n\n"
Yulia Portnova7ddfa732015-02-24 17:32:58 +020041 return templ.format(data, formatter(res), "=" * 80)
koder aka kdanilove21d7472015-02-14 19:02:04 -080042
43
koder aka kdanilov5d589b42015-03-26 12:25:51 +020044def connect_one(node):
45 try:
koder aka kdanilov2c473092015-03-29 17:12:13 +030046 ssh_pref = "ssh://"
47 if node.conn_url.startswith(ssh_pref):
48 url = node.conn_url[len(ssh_pref):]
49 node.connection = ssh_utils.connect(url)
50 else:
51 raise ValueError("Unknown url type {0}".format(node.conn_url))
koder aka kdanilov3a6633e2015-03-26 18:20:00 +020052 except Exception:
koder aka kdanilov2c473092015-03-29 17:12:13 +030053 logger.exception("During connect to {0}".format(node))
koder aka kdanilov5d589b42015-03-26 12:25:51 +020054
55
56def connect_all(nodes):
koder aka kdanilov2c473092015-03-29 17:12:13 +030057 logger.info("Connecting to nodes")
58 with ThreadPoolExecutor(32) as pool:
59 list(pool.map(connect_one, nodes))
60
61
62def save_sensors_data(q):
63 logger.info("Start receiving sensors data")
64 while True:
65 val = q.get()
66 if val is None:
67 break
68 # logger.debug("Sensors -> {0!r}".format(val))
69 logger.info("Sensors thread exits")
70
71
72def test_thread(test, node, barrier):
73 try:
74 logger.debug("Run preparation for {0}".format(node.conn_url))
75 test.pre_run(node.connection)
76 logger.debug("Run test for {0}".format(node.conn_url))
77 test.run(node.connection, barrier)
78 except:
79 logger.exception("In test {0} for node {1}".format(test, node))
80
81
82def run_tests(config, nodes):
83 tool_type_mapper = {
84 "io": IOPerfTest,
85 "pgbench": PgBenchTest,
86 }
87
88 test_nodes = [node for node in nodes
89 if 'testnode' in node.roles]
90
91 res_q = Queue.Queue()
92
93 for name, params in config['tests'].items():
94 logger.info("Starting {0} tests".format(name))
95
96 threads = []
97 barrier = utils.Barrier(len(test_nodes))
98 for node in test_nodes:
99 msg = "Starting {0} test on {1} node"
100 logger.debug(msg.format(name, node.conn_url))
101 test = tool_type_mapper[name](params, res_q.put)
102 th = threading.Thread(None, test_thread, None,
103 (test, node, barrier))
104 threads.append(th)
105 th.daemon = True
106 th.start()
107
108 for th in threads:
109 th.join()
110
111 while not res_q.empty():
112 logger.info("Get test result {0!r}".format(res_q.get()))
113
114
115def parse_args(argv):
116 parser = argparse.ArgumentParser(
117 description="Run disk io performance test")
118
119 parser.add_argument("-l", dest='extra_logs',
120 action='store_true', default=False,
121 help="print some extra log info")
122
123 parser.add_argument('stages', nargs="+",
124 choices=["discover", "connect", "start_new_nodes",
125 "deploy_sensors", "run_tests"])
126
127 return parser.parse_args(argv[1:])
128
129
130def log_nodes_statistic(nodes):
131 logger.info("Found {0} nodes total".format(len(nodes)))
132 per_role = collections.defaultdict(lambda: 0)
133 for node in nodes:
134 for role in node.roles:
135 per_role[role] += 1
136
137 for role, count in sorted(per_role.items()):
138 logger.debug("Found {0} nodes with role {1}".format(count, role))
139
140
141def log_sensors_config(cfg):
koder aka kdanilov5d589b42015-03-26 12:25:51 +0200142 pass
143
144
koder aka kdanilov3f356262015-02-13 08:06:14 -0800145def main(argv):
koder aka kdanilove06762a2015-03-22 23:32:09 +0200146 opts = parse_args(argv)
koder aka kdanilov2c473092015-03-29 17:12:13 +0300147
148 level = logging.DEBUG if opts.extra_logs else logging.WARNING
149 setup_logger(logger, level)
150
151 nodes = []
152
koder aka kdanilove06762a2015-03-22 23:32:09 +0200153 if 'discover' in opts.stages:
koder aka kdanilov2c473092015-03-29 17:12:13 +0300154 logger.info("Start node discovery")
155 nodes = discover.discover(cfg_dict.get('discover'))
156
157 if 'explicit_nodes' in cfg_dict:
158 for url, roles in cfg_dict['explicit_nodes'].items():
159 nodes.append(Node(url, roles.split(",")))
160
161 log_nodes_statistic(nodes)
koder aka kdanilove06762a2015-03-22 23:32:09 +0200162
163 if 'connect' in opts.stages:
koder aka kdanilov2c473092015-03-29 17:12:13 +0300164 connect_all(nodes)
koder aka kdanilove06762a2015-03-22 23:32:09 +0200165
koder aka kdanilov2c473092015-03-29 17:12:13 +0300166 if 'deploy_sensors' in opts.stages:
167 logger.info("Deploing sensors")
168 cfg = cfg_dict.get('sensors')
169 sens_cfg = []
170
171 for role, sensors_str in cfg["roles_mapping"].items():
172 sensors = [sens.strip() for sens in sensors_str.split(",")]
173
174 collect_cfg = dict((sensor, {}) for sensor in sensors)
175
176 for node in nodes:
177 if role in node.roles:
178 sens_cfg.append((node.connection, collect_cfg))
179
180 log_sensors_config(sens_cfg)
181
182 sensor_cm = start_monitoring(cfg["receiver_uri"], None,
183 connected_config=sens_cfg)
184
185 with sensor_cm as sensors_control_queue:
186 th = threading.Thread(None, save_sensors_data, None,
187 (sensors_control_queue,))
188 th.daemon = True
189 th.start()
190
191 # TODO: wait till all nodes start to send sensors data
192
193 if 'run_tests' in opts.stages:
194 run_tests(cfg_dict, nodes)
195
196 sensors_control_queue.put(None)
197 th.join()
198 elif 'run_tests' in opts.stages:
199 run_tests(cfg_dict, nodes)
200
201 logger.info("Disconnecting")
202 for node in nodes:
203 node.connection.close()
204
koder aka kdanilove06762a2015-03-22 23:32:09 +0200205 return 0
koder aka kdanilov3f356262015-02-13 08:06:14 -0800206
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -0800207
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -0800208if __name__ == '__main__':
koder aka kdanilove06762a2015-03-22 23:32:09 +0200209 exit(main(sys.argv))