blob: 1dac2e5050017681744fed166e8970892a9aeb61 [file] [log] [blame]
import os
import pickle
import sys
import json
import Queue
import pprint
import logging
import argparse
import threading
import collections
from concurrent.futures import ThreadPoolExecutor
import utils
import ssh_utils
import start_vms
from nodes import discover
from nodes.node import Node
from config import cfg_dict, parse_config
from tests.itest import IOPerfTest, PgBenchTest
from sensors.api import start_monitoring
logger = logging.getLogger("io-perf-tool")
def setup_logger(logger, level=logging.DEBUG):
logger.setLevel(level)
ch = logging.StreamHandler()
ch.setLevel(level)
logger.addHandler(ch)
log_format = '%(asctime)s - %(levelname)-6s - %(name)s - %(message)s'
formatter = logging.Formatter(log_format,
"%H:%M:%S")
ch.setFormatter(formatter)
def format_result(res, formatter):
data = "\n{0}\n".format("=" * 80)
data += pprint.pformat(res) + "\n"
data += "{0}\n".format("=" * 80)
templ = "{0}\n\n====> {1}\n\n{2}\n\n"
return templ.format(data, formatter(res), "=" * 80)
def connect_one(node):
try:
ssh_pref = "ssh://"
if node.conn_url.startswith(ssh_pref):
url = node.conn_url[len(ssh_pref):]
node.connection = ssh_utils.connect(url)
else:
raise ValueError("Unknown url type {0}".format(node.conn_url))
except Exception:
logger.exception("During connect to {0}".format(node))
def connect_all(nodes):
logger.info("Connecting to nodes")
with ThreadPoolExecutor(32) as pool:
list(pool.map(connect_one, nodes))
logger.info("All nodes connected successfully")
def save_sensors_data(q):
logger.info("Start receiving sensors data")
sensor_data = []
while True:
val = q.get()
if val is None:
print sensor_data
q.put(sensor_data)
break
sensor_data.append(val)
logger.info("Sensors thread exits")
def test_thread(test, node, barrier):
try:
logger.debug("Run preparation for {0}".format(node.conn_url))
test.pre_run(node.connection)
logger.debug("Run test for {0}".format(node.conn_url))
test.run(node.connection, barrier)
except:
logger.exception("In test {0} for node {1}".format(test, node))
def run_tests(config, nodes):
tool_type_mapper = {
"io": IOPerfTest,
"pgbench": PgBenchTest,
}
test_nodes = [node for node in nodes
if 'testnode' in node.roles]
res_q = Queue.Queue()
for test in config['tests']:
for test in config['tests'][test]['internal_tests']:
for name, params in test.items():
logger.info("Starting {0} tests".format(name))
threads = []
barrier = utils.Barrier(len(test_nodes))
for node in test_nodes:
msg = "Starting {0} test on {1} node"
logger.debug(msg.format(name, node.conn_url))
test = tool_type_mapper[name](params, res_q.put)
th = threading.Thread(None, test_thread, None,
(test, node, barrier))
threads.append(th)
th.daemon = True
th.start()
for th in threads:
th.join()
results = []
while not res_q.empty():
results.append(res_q.get())
# logger.info("Get test result {0!r}".format(results[-1]))
yield name, results
def parse_args(argv):
parser = argparse.ArgumentParser(
description="Run disk io performance test")
parser.add_argument("-l", dest='extra_logs',
action='store_true', default=False,
help="print some extra log info")
parser.add_argument("-b", '--build_description',
type=str, default="Build info")
parser.add_argument("-i", '--build_id', type=str, default="id")
parser.add_argument("-t", '--build_type', type=str, default="GA")
parser.add_argument("-u", '--username', type=str, default="admin")
parser.add_argument("-o", '--output-dest', nargs="*")
parser.add_argument("config_file", nargs="?", default="config.yaml")
return parser.parse_args(argv[1:])
def log_nodes_statistic(_, ctx):
nodes = ctx.nodes
logger.info("Found {0} nodes total".format(len(nodes)))
per_role = collections.defaultdict(lambda: 0)
for node in nodes:
for role in node.roles:
per_role[role] += 1
for role, count in sorted(per_role.items()):
logger.debug("Found {0} nodes with role {1}".format(count, role))
def log_sensors_config(cfg):
pass
def connect_stage(cfg, ctx):
ctx.clear_calls_stack.append(disconnect_stage)
connect_all(ctx.nodes)
def discover_stage(cfg, ctx):
if 'discover' in cfg:
discover_objs = [i.strip() for i in cfg['discover'].strip().split(",")]
ctx.nodes.extend(discover.discover(discover_objs, cfg['clouds']))
for url, roles in cfg.get('explicit_nodes', {}).items():
ctx.nodes.append(Node(url, roles.split(",")))
def deploy_sensors_stage(cfg_dict, ctx):
ctx.clear_calls_stack.append(remove_sensors_stage)
if 'sensors' not in cfg_dict:
return
cfg = cfg_dict.get('sensors')
sens_cfg = []
for role, sensors_str in cfg["roles_mapping"].items():
sensors = [sens.strip() for sens in sensors_str.split(",")]
collect_cfg = dict((sensor, {}) for sensor in sensors)
for node in ctx.nodes:
if role in node.roles:
sens_cfg.append((node.connection, collect_cfg))
log_sensors_config(sens_cfg)
ctx.sensor_cm = start_monitoring(cfg["receiver_uri"], None,
connected_config=sens_cfg)
ctx.sensors_control_queue = ctx.sensor_cm.__enter__()
th = threading.Thread(None, save_sensors_data, None,
(ctx.sensors_control_queue,))
th.daemon = True
th.start()
ctx.sensor_listen_thread = th
def remove_sensors_stage(cfg, ctx):
ctx.sensors_control_queue.put(None)
ctx.sensor_listen_thread.join()
ctx.sensor_data = ctx.sensors_control_queue.get()
def run_all_test(cfg, ctx, store_nodes):
ctx.results = []
if 'start_test_nodes' in cfg['tests']:
params = cfg['tests']['start_test_nodes']['openstack']
for new_node in start_vms.launch_vms(params):
new_node.roles.append('testnode')
ctx.nodes.append(new_node)
if 'tests' in cfg:
store_nodes(ctx.nodes)
ctx.results.extend(run_tests(cfg_dict, ctx.nodes))
def shut_down_vms(cfg, ctx):
with open('vm_journal.log') as f:
data = str(f.read())
nodes = pickle.loads(data)
for node in nodes:
logger.info("Node " + str(node) + " has been loaded")
logger.info("Removing nodes")
start_vms.clear_nodes()
logger.info("Nodes has been removed")
def store_nodes(nodes):
with open('vm_journal.log', 'w+') as f:
f.write(pickle.dumps([nodes]))
for node in nodes:
logger.info("Node " + str(node) + " has been stored")
def clear_enviroment(cfg, ctx):
if os.path.exists('vm_journal.log'):
shut_down_vms(cfg, ctx)
os.remove('vm_journal.log')
def run_tests_stage(cfg, ctx):
# clear nodes that possible were created on previous test running
clear_enviroment(cfg, ctx)
ctx.clear_calls_stack.append(shut_down_vms)
run_all_test(cfg, ctx, store_nodes)
def disconnect_stage(cfg, ctx):
for node in ctx.nodes:
if node.connection is not None:
node.connection.close()
def report_stage(cfg, ctx):
output_dest = cfg.get('output_dest')
if output_dest is not None:
with open(output_dest, "w") as fd:
data = {"sensor_data": ctx.sensor_data,
"results": ctx.results}
fd.write(json.dumps(data))
else:
print "=" * 20 + " RESULTS " + "=" * 20
pprint.pprint(ctx.results)
print "=" * 60
def complete_log_nodes_statistic(cfg, ctx):
nodes = ctx.nodes
for node in nodes:
logger.debug(str(node))
class Context(object):
def __init__(self):
self.build_meta = {}
self.nodes = []
self.clear_calls_stack = []
def load_config(path):
global cfg_dict
cfg_dict = parse_config(path)
def main(argv):
opts = parse_args(argv)
level = logging.DEBUG if opts.extra_logs else logging.WARNING
setup_logger(logger, level)
stages = [
discover_stage,
connect_stage,
complete_log_nodes_statistic,
# deploy_sensors_stage,
run_tests_stage,
report_stage
]
load_config(opts.config_file)
ctx = Context()
ctx.build_meta['build_id'] = opts.build_id
ctx.build_meta['build_descrption'] = opts.build_description
ctx.build_meta['build_type'] = opts.build_type
ctx.build_meta['username'] = opts.username
logger.setLevel(logging.INFO)
logger.addHandler(logging.FileHandler('log.txt'))
try:
for stage in stages:
logger.info("Start {0.__name__} stage".format(stage))
print "Start {0.__name__} stage".format(stage)
stage(cfg_dict, ctx)
finally:
exc, cls, tb = sys.exc_info()
for stage in ctx.clear_calls_stack[::-1]:
try:
logger.info("Start {0.__name__} stage".format(stage))
stage(cfg_dict, ctx)
except:
pass
if exc is not None:
raise exc, cls, tb
return 0
if __name__ == '__main__':
exit(main(sys.argv))