| import sys |
| import json |
| import Queue |
| import pprint |
| import logging |
| import argparse |
| import threading |
| import collections |
| |
| from concurrent.futures import ThreadPoolExecutor |
| |
| import utils |
| import ssh_utils |
| import start_vms |
| from nodes import discover |
| from nodes.node import Node |
| from config import cfg_dict |
| from tests.itest import IOPerfTest, PgBenchTest |
| |
| from sensors.api import start_monitoring |
| |
| |
| logger = logging.getLogger("io-perf-tool") |
| |
| |
| def setup_logger(logger, level=logging.DEBUG): |
| logger.setLevel(level) |
| ch = logging.StreamHandler() |
| ch.setLevel(level) |
| logger.addHandler(ch) |
| |
| log_format = '%(asctime)s - %(levelname)-6s - %(name)s - %(message)s' |
| formatter = logging.Formatter(log_format, |
| "%H:%M:%S") |
| ch.setFormatter(formatter) |
| |
| |
| def format_result(res, formatter): |
| data = "\n{0}\n".format("=" * 80) |
| data += pprint.pformat(res) + "\n" |
| data += "{0}\n".format("=" * 80) |
| templ = "{0}\n\n====> {1}\n\n{2}\n\n" |
| return templ.format(data, formatter(res), "=" * 80) |
| |
| |
| def connect_one(node): |
| try: |
| ssh_pref = "ssh://" |
| if node.conn_url.startswith(ssh_pref): |
| url = node.conn_url[len(ssh_pref):] |
| node.connection = ssh_utils.connect(url) |
| else: |
| raise ValueError("Unknown url type {0}".format(node.conn_url)) |
| except Exception: |
| logger.exception("During connect to {0}".format(node)) |
| |
| |
| def connect_all(nodes): |
| logger.info("Connecting to nodes") |
| with ThreadPoolExecutor(32) as pool: |
| list(pool.map(connect_one, nodes)) |
| logger.info("All nodes connected successfully") |
| |
| |
| def save_sensors_data(q): |
| logger.info("Start receiving sensors data") |
| sensor_data = [] |
| while True: |
| val = q.get() |
| if val is None: |
| print sensor_data |
| q.put(sensor_data) |
| break |
| sensor_data.append(val) |
| logger.info("Sensors thread exits") |
| |
| |
| def test_thread(test, node, barrier): |
| try: |
| logger.debug("Run preparation for {0}".format(node.conn_url)) |
| test.pre_run(node.connection) |
| logger.debug("Run test for {0}".format(node.conn_url)) |
| test.run(node.connection, barrier) |
| except: |
| logger.exception("In test {0} for node {1}".format(test, node)) |
| |
| |
| def run_tests(config, nodes): |
| tool_type_mapper = { |
| "io": IOPerfTest, |
| "pgbench": PgBenchTest, |
| } |
| |
| test_nodes = [node for node in nodes |
| if 'testnode' in node.roles] |
| |
| res_q = Queue.Queue() |
| |
| for test in config['tests']: |
| for name, params in test.items(): |
| logger.info("Starting {0} tests".format(name)) |
| |
| threads = [] |
| barrier = utils.Barrier(len(test_nodes)) |
| for node in test_nodes: |
| msg = "Starting {0} test on {1} node" |
| logger.debug(msg.format(name, node.conn_url)) |
| test = tool_type_mapper[name](params, res_q.put) |
| th = threading.Thread(None, test_thread, None, |
| (test, node, barrier)) |
| threads.append(th) |
| th.daemon = True |
| th.start() |
| |
| for th in threads: |
| th.join() |
| |
| results = [] |
| while not res_q.empty(): |
| results.append(res_q.get()) |
| # logger.info("Get test result {0!r}".format(results[-1])) |
| yield name, results |
| |
| |
| def parse_args(argv): |
| parser = argparse.ArgumentParser( |
| description="Run disk io performance test") |
| |
| parser.add_argument("-l", dest='extra_logs', |
| action='store_true', default=False, |
| help="print some extra log info") |
| |
| parser.add_argument("-o", '--output-dest', nargs="*") |
| parser.add_argument("config_file", nargs="?", default="config.yaml") |
| |
| return parser.parse_args(argv[1:]) |
| |
| |
| def log_nodes_statistic(_, ctx): |
| nodes = ctx.nodes |
| logger.info("Found {0} nodes total".format(len(nodes))) |
| per_role = collections.defaultdict(lambda: 0) |
| for node in nodes: |
| for role in node.roles: |
| per_role[role] += 1 |
| |
| for role, count in sorted(per_role.items()): |
| logger.debug("Found {0} nodes with role {1}".format(count, role)) |
| |
| |
| def log_sensors_config(cfg): |
| pass |
| |
| |
| def connect_stage(cfg, ctx): |
| ctx.clear_calls_stack.append(disconnect_stage) |
| connect_all(ctx.nodes) |
| |
| |
| def discover_stage(cfg, ctx): |
| if 'discover' in cfg: |
| discover_objs = [i.strip() for i in cfg['discover'].strip().split(",")] |
| ctx.nodes.extend(discover.discover(discover_objs, cfg['clouds'])) |
| |
| for url, roles in cfg.get('explicit_nodes', {}).items(): |
| ctx.nodes.append(Node(url, roles.split(","))) |
| |
| |
| def deploy_sensors_stage(cfg_dict, ctx): |
| ctx.clear_calls_stack.append(remove_sensors_stage) |
| if 'sensors' not in cfg_dict: |
| return |
| |
| cfg = cfg_dict.get('sensors') |
| sens_cfg = [] |
| |
| for role, sensors_str in cfg["roles_mapping"].items(): |
| sensors = [sens.strip() for sens in sensors_str.split(",")] |
| |
| collect_cfg = dict((sensor, {}) for sensor in sensors) |
| |
| for node in ctx.nodes: |
| if role in node.roles: |
| sens_cfg.append((node.connection, collect_cfg)) |
| |
| log_sensors_config(sens_cfg) |
| |
| ctx.sensor_cm = start_monitoring(cfg["receiver_uri"], None, |
| connected_config=sens_cfg) |
| |
| ctx.sensors_control_queue = ctx.sensor_cm.__enter__() |
| |
| th = threading.Thread(None, save_sensors_data, None, |
| (ctx.sensors_control_queue,)) |
| th.daemon = True |
| th.start() |
| ctx.sensor_listen_thread = th |
| |
| |
| def remove_sensors_stage(cfg, ctx): |
| ctx.sensors_control_queue.put(None) |
| ctx.sensor_listen_thread.join() |
| ctx.sensor_data = ctx.sensors_control_queue.get() |
| |
| |
| def run_tests_stage(cfg, ctx): |
| ctx.results = [] |
| |
| if 'tests' in cfg: |
| ctx.results.extend(run_tests(cfg_dict, ctx.nodes)) |
| |
| # if 'start_test_nodes' in opts.stages: |
| # params = cfg_dict['start_test_nodes']['openstack'] |
| # for new_node in start_vms.launch_vms(params): |
| # new_node.roles.append('testnode') |
| # nodes.append(new_node) |
| |
| |
| def disconnect_stage(cfg, ctx): |
| for node in ctx.nodes: |
| if node.connection is not None: |
| node.connection.close() |
| |
| |
| def report_stage(cfg, ctx): |
| output_dest = cfg.get('output_dest') |
| if output_dest is not None: |
| with open(output_dest, "w") as fd: |
| data = {"sensor_data": ctx.sensor_data, |
| "results": ctx.results} |
| fd.write(json.dumps(data)) |
| else: |
| print "=" * 20 + " RESULTS " + "=" * 20 |
| pprint.pprint(ctx.results) |
| print "=" * 60 |
| |
| |
| def complete_log_nodes_statistic(cfg, ctx): |
| nodes = ctx.nodes |
| for node in nodes: |
| logger.debug(str(node)) |
| |
| |
| class Context(object): |
| def __init__(self): |
| self.nodes = [] |
| self.clear_calls_stack = [] |
| |
| |
| def main(argv): |
| opts = parse_args(argv) |
| |
| level = logging.DEBUG if opts.extra_logs else logging.WARNING |
| setup_logger(logger, level) |
| |
| stages = [ |
| discover_stage, |
| connect_stage, |
| complete_log_nodes_statistic, |
| # deploy_sensors_stage, |
| run_tests_stage, |
| report_stage |
| ] |
| |
| ctx = Context() |
| try: |
| for stage in stages: |
| logger.info("Start {0.__name__} stage".format(stage)) |
| stage(cfg_dict, ctx) |
| finally: |
| exc, cls, tb = sys.exc_info() |
| for stage in ctx.clear_calls_stack[::-1]: |
| try: |
| logger.info("Start {0.__name__} stage".format(stage)) |
| stage(cfg_dict, ctx) |
| except: |
| pass |
| |
| if exc is not None: |
| raise exc, cls, tb |
| |
| return 0 |
| |
| |
| if __name__ == '__main__': |
| exit(main(sys.argv)) |