koder aka kdanilov | 7acd6bd | 2015-02-12 14:28:30 -0800 | [diff] [blame] | 1 | import sys |
koder aka kdanilov | 2c47309 | 2015-03-29 17:12:13 +0300 | [diff] [blame^] | 2 | import Queue |
koder aka kdanilov | 7acd6bd | 2015-02-12 14:28:30 -0800 | [diff] [blame] | 3 | import pprint |
koder aka kdanilov | e21d747 | 2015-02-14 19:02:04 -0800 | [diff] [blame] | 4 | import logging |
koder aka kdanilov | 7acd6bd | 2015-02-12 14:28:30 -0800 | [diff] [blame] | 5 | import argparse |
koder aka kdanilov | 2c47309 | 2015-03-29 17:12:13 +0300 | [diff] [blame^] | 6 | import threading |
| 7 | import collections |
koder aka kdanilov | 7acd6bd | 2015-02-12 14:28:30 -0800 | [diff] [blame] | 8 | |
koder aka kdanilov | 2c47309 | 2015-03-29 17:12:13 +0300 | [diff] [blame^] | 9 | from concurrent.futures import ThreadPoolExecutor |
| 10 | |
| 11 | import utils |
koder aka kdanilov | e06762a | 2015-03-22 23:32:09 +0200 | [diff] [blame] | 12 | import ssh_utils |
koder aka kdanilov | e06762a | 2015-03-22 23:32:09 +0200 | [diff] [blame] | 13 | from nodes import discover |
koder aka kdanilov | 2c47309 | 2015-03-29 17:12:13 +0300 | [diff] [blame^] | 14 | from nodes.node import Node |
Yulia Portnova | 0e64ea2 | 2015-03-20 17:27:22 +0200 | [diff] [blame] | 15 | from config import cfg_dict |
koder aka kdanilov | e06762a | 2015-03-22 23:32:09 +0200 | [diff] [blame] | 16 | from itest import IOPerfTest, PgBenchTest |
Yulia Portnova | 7ddfa73 | 2015-02-24 17:32:58 +0200 | [diff] [blame] | 17 | |
koder aka kdanilov | 2c47309 | 2015-03-29 17:12:13 +0300 | [diff] [blame^] | 18 | from sensors.api import start_monitoring |
| 19 | |
| 20 | |
koder aka kdanilov | e21d747 | 2015-02-14 19:02:04 -0800 | [diff] [blame] | 21 | logger = logging.getLogger("io-perf-tool") |
koder aka kdanilov | e21d747 | 2015-02-14 19:02:04 -0800 | [diff] [blame] | 22 | |
| 23 | |
koder aka kdanilov | 2c47309 | 2015-03-29 17:12:13 +0300 | [diff] [blame^] | 24 | def setup_logger(logger, level=logging.DEBUG): |
| 25 | logger.setLevel(level) |
| 26 | ch = logging.StreamHandler() |
| 27 | ch.setLevel(level) |
| 28 | logger.addHandler(ch) |
Yulia Portnova | 7ddfa73 | 2015-02-24 17:32:58 +0200 | [diff] [blame] | 29 | |
koder aka kdanilov | 2c47309 | 2015-03-29 17:12:13 +0300 | [diff] [blame^] | 30 | log_format = '%(asctime)s - %(levelname)-6s - %(name)s - %(message)s' |
| 31 | formatter = logging.Formatter(log_format, |
| 32 | "%H:%M:%S") |
| 33 | ch.setFormatter(formatter) |
koder aka kdanilov | 7acd6bd | 2015-02-12 14:28:30 -0800 | [diff] [blame] | 34 | |
| 35 | |
Yulia Portnova | 7ddfa73 | 2015-02-24 17:32:58 +0200 | [diff] [blame] | 36 | def format_result(res, formatter): |
koder aka kdanilov | e21d747 | 2015-02-14 19:02:04 -0800 | [diff] [blame] | 37 | data = "\n{0}\n".format("=" * 80) |
| 38 | data += pprint.pformat(res) + "\n" |
| 39 | data += "{0}\n".format("=" * 80) |
koder aka kdanilov | fe05662 | 2015-02-19 08:46:15 -0800 | [diff] [blame] | 40 | templ = "{0}\n\n====> {1}\n\n{2}\n\n" |
Yulia Portnova | 7ddfa73 | 2015-02-24 17:32:58 +0200 | [diff] [blame] | 41 | return templ.format(data, formatter(res), "=" * 80) |
koder aka kdanilov | e21d747 | 2015-02-14 19:02:04 -0800 | [diff] [blame] | 42 | |
| 43 | |
koder aka kdanilov | 5d589b4 | 2015-03-26 12:25:51 +0200 | [diff] [blame] | 44 | def connect_one(node): |
| 45 | try: |
koder aka kdanilov | 2c47309 | 2015-03-29 17:12:13 +0300 | [diff] [blame^] | 46 | ssh_pref = "ssh://" |
| 47 | if node.conn_url.startswith(ssh_pref): |
| 48 | url = node.conn_url[len(ssh_pref):] |
| 49 | node.connection = ssh_utils.connect(url) |
| 50 | else: |
| 51 | raise ValueError("Unknown url type {0}".format(node.conn_url)) |
koder aka kdanilov | 3a6633e | 2015-03-26 18:20:00 +0200 | [diff] [blame] | 52 | except Exception: |
koder aka kdanilov | 2c47309 | 2015-03-29 17:12:13 +0300 | [diff] [blame^] | 53 | logger.exception("During connect to {0}".format(node)) |
koder aka kdanilov | 5d589b4 | 2015-03-26 12:25:51 +0200 | [diff] [blame] | 54 | |
| 55 | |
| 56 | def connect_all(nodes): |
koder aka kdanilov | 2c47309 | 2015-03-29 17:12:13 +0300 | [diff] [blame^] | 57 | logger.info("Connecting to nodes") |
| 58 | with ThreadPoolExecutor(32) as pool: |
| 59 | list(pool.map(connect_one, nodes)) |
| 60 | |
| 61 | |
| 62 | def save_sensors_data(q): |
| 63 | logger.info("Start receiving sensors data") |
| 64 | while True: |
| 65 | val = q.get() |
| 66 | if val is None: |
| 67 | break |
| 68 | # logger.debug("Sensors -> {0!r}".format(val)) |
| 69 | logger.info("Sensors thread exits") |
| 70 | |
| 71 | |
| 72 | def test_thread(test, node, barrier): |
| 73 | try: |
| 74 | logger.debug("Run preparation for {0}".format(node.conn_url)) |
| 75 | test.pre_run(node.connection) |
| 76 | logger.debug("Run test for {0}".format(node.conn_url)) |
| 77 | test.run(node.connection, barrier) |
| 78 | except: |
| 79 | logger.exception("In test {0} for node {1}".format(test, node)) |
| 80 | |
| 81 | |
| 82 | def run_tests(config, nodes): |
| 83 | tool_type_mapper = { |
| 84 | "io": IOPerfTest, |
| 85 | "pgbench": PgBenchTest, |
| 86 | } |
| 87 | |
| 88 | test_nodes = [node for node in nodes |
| 89 | if 'testnode' in node.roles] |
| 90 | |
| 91 | res_q = Queue.Queue() |
| 92 | |
| 93 | for name, params in config['tests'].items(): |
| 94 | logger.info("Starting {0} tests".format(name)) |
| 95 | |
| 96 | threads = [] |
| 97 | barrier = utils.Barrier(len(test_nodes)) |
| 98 | for node in test_nodes: |
| 99 | msg = "Starting {0} test on {1} node" |
| 100 | logger.debug(msg.format(name, node.conn_url)) |
| 101 | test = tool_type_mapper[name](params, res_q.put) |
| 102 | th = threading.Thread(None, test_thread, None, |
| 103 | (test, node, barrier)) |
| 104 | threads.append(th) |
| 105 | th.daemon = True |
| 106 | th.start() |
| 107 | |
| 108 | for th in threads: |
| 109 | th.join() |
| 110 | |
| 111 | while not res_q.empty(): |
| 112 | logger.info("Get test result {0!r}".format(res_q.get())) |
| 113 | |
| 114 | |
| 115 | def parse_args(argv): |
| 116 | parser = argparse.ArgumentParser( |
| 117 | description="Run disk io performance test") |
| 118 | |
| 119 | parser.add_argument("-l", dest='extra_logs', |
| 120 | action='store_true', default=False, |
| 121 | help="print some extra log info") |
| 122 | |
| 123 | parser.add_argument('stages', nargs="+", |
| 124 | choices=["discover", "connect", "start_new_nodes", |
| 125 | "deploy_sensors", "run_tests"]) |
| 126 | |
| 127 | return parser.parse_args(argv[1:]) |
| 128 | |
| 129 | |
| 130 | def log_nodes_statistic(nodes): |
| 131 | logger.info("Found {0} nodes total".format(len(nodes))) |
| 132 | per_role = collections.defaultdict(lambda: 0) |
| 133 | for node in nodes: |
| 134 | for role in node.roles: |
| 135 | per_role[role] += 1 |
| 136 | |
| 137 | for role, count in sorted(per_role.items()): |
| 138 | logger.debug("Found {0} nodes with role {1}".format(count, role)) |
| 139 | |
| 140 | |
| 141 | def log_sensors_config(cfg): |
koder aka kdanilov | 5d589b4 | 2015-03-26 12:25:51 +0200 | [diff] [blame] | 142 | pass |
| 143 | |
| 144 | |
koder aka kdanilov | 3f35626 | 2015-02-13 08:06:14 -0800 | [diff] [blame] | 145 | def main(argv): |
koder aka kdanilov | e06762a | 2015-03-22 23:32:09 +0200 | [diff] [blame] | 146 | opts = parse_args(argv) |
koder aka kdanilov | 2c47309 | 2015-03-29 17:12:13 +0300 | [diff] [blame^] | 147 | |
| 148 | level = logging.DEBUG if opts.extra_logs else logging.WARNING |
| 149 | setup_logger(logger, level) |
| 150 | |
| 151 | nodes = [] |
| 152 | |
koder aka kdanilov | e06762a | 2015-03-22 23:32:09 +0200 | [diff] [blame] | 153 | if 'discover' in opts.stages: |
koder aka kdanilov | 2c47309 | 2015-03-29 17:12:13 +0300 | [diff] [blame^] | 154 | logger.info("Start node discovery") |
| 155 | nodes = discover.discover(cfg_dict.get('discover')) |
| 156 | |
| 157 | if 'explicit_nodes' in cfg_dict: |
| 158 | for url, roles in cfg_dict['explicit_nodes'].items(): |
| 159 | nodes.append(Node(url, roles.split(","))) |
| 160 | |
| 161 | log_nodes_statistic(nodes) |
koder aka kdanilov | e06762a | 2015-03-22 23:32:09 +0200 | [diff] [blame] | 162 | |
| 163 | if 'connect' in opts.stages: |
koder aka kdanilov | 2c47309 | 2015-03-29 17:12:13 +0300 | [diff] [blame^] | 164 | connect_all(nodes) |
koder aka kdanilov | e06762a | 2015-03-22 23:32:09 +0200 | [diff] [blame] | 165 | |
koder aka kdanilov | 2c47309 | 2015-03-29 17:12:13 +0300 | [diff] [blame^] | 166 | if 'deploy_sensors' in opts.stages: |
| 167 | logger.info("Deploing sensors") |
| 168 | cfg = cfg_dict.get('sensors') |
| 169 | sens_cfg = [] |
| 170 | |
| 171 | for role, sensors_str in cfg["roles_mapping"].items(): |
| 172 | sensors = [sens.strip() for sens in sensors_str.split(",")] |
| 173 | |
| 174 | collect_cfg = dict((sensor, {}) for sensor in sensors) |
| 175 | |
| 176 | for node in nodes: |
| 177 | if role in node.roles: |
| 178 | sens_cfg.append((node.connection, collect_cfg)) |
| 179 | |
| 180 | log_sensors_config(sens_cfg) |
| 181 | |
| 182 | sensor_cm = start_monitoring(cfg["receiver_uri"], None, |
| 183 | connected_config=sens_cfg) |
| 184 | |
| 185 | with sensor_cm as sensors_control_queue: |
| 186 | th = threading.Thread(None, save_sensors_data, None, |
| 187 | (sensors_control_queue,)) |
| 188 | th.daemon = True |
| 189 | th.start() |
| 190 | |
| 191 | # TODO: wait till all nodes start to send sensors data |
| 192 | |
| 193 | if 'run_tests' in opts.stages: |
| 194 | run_tests(cfg_dict, nodes) |
| 195 | |
| 196 | sensors_control_queue.put(None) |
| 197 | th.join() |
| 198 | elif 'run_tests' in opts.stages: |
| 199 | run_tests(cfg_dict, nodes) |
| 200 | |
| 201 | logger.info("Disconnecting") |
| 202 | for node in nodes: |
| 203 | node.connection.close() |
| 204 | |
koder aka kdanilov | e06762a | 2015-03-22 23:32:09 +0200 | [diff] [blame] | 205 | return 0 |
koder aka kdanilov | 3f35626 | 2015-02-13 08:06:14 -0800 | [diff] [blame] | 206 | |
koder aka kdanilov | 7acd6bd | 2015-02-12 14:28:30 -0800 | [diff] [blame] | 207 | |
koder aka kdanilov | 7acd6bd | 2015-02-12 14:28:30 -0800 | [diff] [blame] | 208 | if __name__ == '__main__': |
koder aka kdanilov | e06762a | 2015-03-22 23:32:09 +0200 | [diff] [blame] | 209 | exit(main(sys.argv)) |