blob: d1a9f4fd99ec9c1fe9d98a2e4cb2943e269b3532 [file] [log] [blame]
import sys
import Queue
import pprint
import logging
import argparse
import threading
import collections
from concurrent.futures import ThreadPoolExecutor
import utils
import ssh_utils
from nodes import discover
from nodes.node import Node
from config import cfg_dict
from itest import IOPerfTest, PgBenchTest
from sensors.api import start_monitoring
logger = logging.getLogger("io-perf-tool")
def setup_logger(logger, level=logging.DEBUG):
logger.setLevel(level)
ch = logging.StreamHandler()
ch.setLevel(level)
logger.addHandler(ch)
log_format = '%(asctime)s - %(levelname)-6s - %(name)s - %(message)s'
formatter = logging.Formatter(log_format,
"%H:%M:%S")
ch.setFormatter(formatter)
def format_result(res, formatter):
data = "\n{0}\n".format("=" * 80)
data += pprint.pformat(res) + "\n"
data += "{0}\n".format("=" * 80)
templ = "{0}\n\n====> {1}\n\n{2}\n\n"
return templ.format(data, formatter(res), "=" * 80)
def connect_one(node):
try:
ssh_pref = "ssh://"
if node.conn_url.startswith(ssh_pref):
url = node.conn_url[len(ssh_pref):]
node.connection = ssh_utils.connect(url)
else:
raise ValueError("Unknown url type {0}".format(node.conn_url))
except Exception:
logger.exception("During connect to {0}".format(node))
def connect_all(nodes):
logger.info("Connecting to nodes")
with ThreadPoolExecutor(32) as pool:
list(pool.map(connect_one, nodes))
def save_sensors_data(q):
logger.info("Start receiving sensors data")
while True:
val = q.get()
if val is None:
break
# logger.debug("Sensors -> {0!r}".format(val))
logger.info("Sensors thread exits")
def test_thread(test, node, barrier):
try:
logger.debug("Run preparation for {0}".format(node.conn_url))
test.pre_run(node.connection)
logger.debug("Run test for {0}".format(node.conn_url))
test.run(node.connection, barrier)
except:
logger.exception("In test {0} for node {1}".format(test, node))
def run_tests(config, nodes):
tool_type_mapper = {
"io": IOPerfTest,
"pgbench": PgBenchTest,
}
test_nodes = [node for node in nodes
if 'testnode' in node.roles]
res_q = Queue.Queue()
for name, params in config['tests'].items():
logger.info("Starting {0} tests".format(name))
threads = []
barrier = utils.Barrier(len(test_nodes))
for node in test_nodes:
msg = "Starting {0} test on {1} node"
logger.debug(msg.format(name, node.conn_url))
test = tool_type_mapper[name](params, res_q.put)
th = threading.Thread(None, test_thread, None,
(test, node, barrier))
threads.append(th)
th.daemon = True
th.start()
for th in threads:
th.join()
while not res_q.empty():
logger.info("Get test result {0!r}".format(res_q.get()))
def parse_args(argv):
parser = argparse.ArgumentParser(
description="Run disk io performance test")
parser.add_argument("-l", dest='extra_logs',
action='store_true', default=False,
help="print some extra log info")
parser.add_argument('stages', nargs="+",
choices=["discover", "connect", "start_new_nodes",
"deploy_sensors", "run_tests"])
return parser.parse_args(argv[1:])
def log_nodes_statistic(nodes):
logger.info("Found {0} nodes total".format(len(nodes)))
per_role = collections.defaultdict(lambda: 0)
for node in nodes:
for role in node.roles:
per_role[role] += 1
for role, count in sorted(per_role.items()):
logger.debug("Found {0} nodes with role {1}".format(count, role))
def log_sensors_config(cfg):
pass
def main(argv):
opts = parse_args(argv)
level = logging.DEBUG if opts.extra_logs else logging.WARNING
setup_logger(logger, level)
nodes = []
if 'discover' in opts.stages:
logger.info("Start node discovery")
nodes = discover.discover(cfg_dict.get('discover'))
if 'explicit_nodes' in cfg_dict:
for url, roles in cfg_dict['explicit_nodes'].items():
nodes.append(Node(url, roles.split(",")))
log_nodes_statistic(nodes)
if 'connect' in opts.stages:
connect_all(nodes)
if 'deploy_sensors' in opts.stages:
logger.info("Deploing sensors")
cfg = cfg_dict.get('sensors')
sens_cfg = []
for role, sensors_str in cfg["roles_mapping"].items():
sensors = [sens.strip() for sens in sensors_str.split(",")]
collect_cfg = dict((sensor, {}) for sensor in sensors)
for node in nodes:
if role in node.roles:
sens_cfg.append((node.connection, collect_cfg))
log_sensors_config(sens_cfg)
sensor_cm = start_monitoring(cfg["receiver_uri"], None,
connected_config=sens_cfg)
with sensor_cm as sensors_control_queue:
th = threading.Thread(None, save_sensors_data, None,
(sensors_control_queue,))
th.daemon = True
th.start()
# TODO: wait till all nodes start to send sensors data
if 'run_tests' in opts.stages:
run_tests(cfg_dict, nodes)
sensors_control_queue.put(None)
th.join()
elif 'run_tests' in opts.stages:
run_tests(cfg_dict, nodes)
logger.info("Disconnecting")
for node in nodes:
node.connection.close()
return 0
if __name__ == '__main__':
exit(main(sys.argv))