large commit. new code, with sensors, line count dropped, etc
diff --git a/run_test.py b/run_test.py
index 25da0ab..d1a9f4f 100755
--- a/run_test.py
+++ b/run_test.py
@@ -1,120 +1,36 @@
-import os
import sys
-import json
-import time
+import Queue
import pprint
import logging
-import os.path
import argparse
+import threading
+import collections
+from concurrent.futures import ThreadPoolExecutor
+
+import utils
import ssh_utils
-import io_scenario
from nodes import discover
+from nodes.node import Node
from config import cfg_dict
-from utils import log_error
-from rest_api import add_test
-from formatters import get_formatter
from itest import IOPerfTest, PgBenchTest
+from sensors.api import start_monitoring
+
+
logger = logging.getLogger("io-perf-tool")
-logger.setLevel(logging.DEBUG)
-ch = logging.StreamHandler()
-ch.setLevel(logging.DEBUG)
-logger.addHandler(ch)
-
-log_format = '%(asctime)s - %(levelname)s - %(name)s - %(message)s'
-formatter = logging.Formatter(log_format,
- "%H:%M:%S")
-ch.setFormatter(formatter)
-tool_type_mapper = {
- "iozone": IOPerfTest,
- "fio": IOPerfTest,
- "pgbench": PgBenchTest,
-}
+def setup_logger(logger, level=logging.DEBUG):
+ logger.setLevel(level)
+ ch = logging.StreamHandler()
+ ch.setLevel(level)
+ logger.addHandler(ch)
-
-def run_io_test(tool,
- script_args,
- test_runner,
- keep_temp_files=False):
-
- files_dir = os.path.dirname(io_scenario.__file__)
-
- path = 'iozone' if 'iozone' == tool else 'fio'
- src_testtool_path = os.path.join(files_dir, path)
-
- obj_cls = tool_type_mapper[tool]
- obj = obj_cls(script_args,
- src_testtool_path,
- None,
- keep_temp_files)
-
- return test_runner(obj)
-
-
-def conn_func(obj, barrier, latest_start_time, conn):
- try:
- test_iter = itest.run_test_iter(obj, conn)
- next(test_iter)
-
- wait_on_barrier(barrier, latest_start_time)
-
- with log_error("!Run test"):
- return next(test_iter)
- except:
- print traceback.format_exc()
- raise
-
-
-def parse_args(argv):
- parser = argparse.ArgumentParser(
- description="Run disk io performance test")
-
- parser.add_argument("-l", dest='extra_logs',
- action='store_true', default=False,
- help="print some extra log info")
-
- parser.add_argument('stages', nargs="+",
- choices=["discover", "connect", "start_new_nodes",
- "deploy_sensors"])
-
- # THIS ALL MOVE TO CONFIG FILE
- # parser.add_argument("-o", "--test-opts", dest='opts',
- # help="cmd line options for test")
-
- # parser.add_argument("-f", "--test-opts-file", dest='opts_file',
- # type=argparse.FileType('r'), default=None,
- # help="file with cmd line options for test")
-
- # parser.add_argument("--max-preparation-time", default=300,
- # type=int, dest="max_preparation_time")
-
- # parser.add_argument("-b", "--build-info", default=None,
- # dest="build_name")
-
- # parser.add_argument("-d", "--data-server-url", default=None,
- # dest="data_server_url")
-
- # parser.add_argument("-n", "--lab-name", default=None,
- # dest="lab_name")
-
- # parser.add_argument("--create-vms-opts", default=None,
- # help="Creating vm's before run ssh runner",
- # dest="create_vms_opts")
-
- # parser.add_argument("-k", "--keep", default=False,
- # help="keep temporary files",
- # dest="keep_temp_files", action='store_true')
-
- # parser.add_argument("--runner", required=True,
- # choices=["local", "ssh"], help="runner type")
-
- # parser.add_argument("--runner-extra-opts", default=None,
- # dest="runner_opts", help="runner extra options")
-
- return parser.parse_args(argv[1:])
+ log_format = '%(asctime)s - %(levelname)-6s - %(name)s - %(message)s'
+ formatter = logging.Formatter(log_format,
+ "%H:%M:%S")
+ ch.setFormatter(formatter)
def format_result(res, formatter):
@@ -127,59 +43,167 @@
def connect_one(node):
try:
- node.connection = ssh_utils.connect(node.connection_url)
+ ssh_pref = "ssh://"
+ if node.conn_url.startswith(ssh_pref):
+ url = node.conn_url[len(ssh_pref):]
+ node.connection = ssh_utils.connect(url)
+ else:
+ raise ValueError("Unknown url type {0}".format(node.conn_url))
except Exception:
- logger.exception()
+ logger.exception("During connect to {0}".format(node))
def connect_all(nodes):
+ logger.info("Connecting to nodes")
+ with ThreadPoolExecutor(32) as pool:
+ list(pool.map(connect_one, nodes))
+
+
+def save_sensors_data(q):
+ logger.info("Start receiving sensors data")
+ while True:
+ val = q.get()
+ if val is None:
+ break
+ # logger.debug("Sensors -> {0!r}".format(val))
+ logger.info("Sensors thread exits")
+
+
+def test_thread(test, node, barrier):
+ try:
+ logger.debug("Run preparation for {0}".format(node.conn_url))
+ test.pre_run(node.connection)
+ logger.debug("Run test for {0}".format(node.conn_url))
+ test.run(node.connection, barrier)
+ except:
+ logger.exception("In test {0} for node {1}".format(test, node))
+
+
+def run_tests(config, nodes):
+ tool_type_mapper = {
+ "io": IOPerfTest,
+ "pgbench": PgBenchTest,
+ }
+
+ test_nodes = [node for node in nodes
+ if 'testnode' in node.roles]
+
+ res_q = Queue.Queue()
+
+ for name, params in config['tests'].items():
+ logger.info("Starting {0} tests".format(name))
+
+ threads = []
+ barrier = utils.Barrier(len(test_nodes))
+ for node in test_nodes:
+ msg = "Starting {0} test on {1} node"
+ logger.debug(msg.format(name, node.conn_url))
+ test = tool_type_mapper[name](params, res_q.put)
+ th = threading.Thread(None, test_thread, None,
+ (test, node, barrier))
+ threads.append(th)
+ th.daemon = True
+ th.start()
+
+ for th in threads:
+ th.join()
+
+ while not res_q.empty():
+ logger.info("Get test result {0!r}".format(res_q.get()))
+
+
+def parse_args(argv):
+ parser = argparse.ArgumentParser(
+ description="Run disk io performance test")
+
+ parser.add_argument("-l", dest='extra_logs',
+ action='store_true', default=False,
+ help="print some extra log info")
+
+ parser.add_argument('stages', nargs="+",
+ choices=["discover", "connect", "start_new_nodes",
+ "deploy_sensors", "run_tests"])
+
+ return parser.parse_args(argv[1:])
+
+
+def log_nodes_statistic(nodes):
+ logger.info("Found {0} nodes total".format(len(nodes)))
+ per_role = collections.defaultdict(lambda: 0)
+ for node in nodes:
+ for role in node.roles:
+ per_role[role] += 1
+
+ for role, count in sorted(per_role.items()):
+ logger.debug("Found {0} nodes with role {1}".format(count, role))
+
+
+def log_sensors_config(cfg):
pass
def main(argv):
- logging_conf = cfg_dict.get('logging')
- if logging_conf:
- if logging_conf.get('extra_logs'):
- logger.setLevel(logging.DEBUG)
- ch.setLevel(logging.DEBUG)
-
opts = parse_args(argv)
+
+ level = logging.DEBUG if opts.extra_logs else logging.WARNING
+ setup_logger(logger, level)
+
+ nodes = []
+
if 'discover' in opts.stages:
- current_data = discover.discover(cfg_dict.get('discover'))
+ logger.info("Start node discovery")
+ nodes = discover.discover(cfg_dict.get('discover'))
+
+ if 'explicit_nodes' in cfg_dict:
+ for url, roles in cfg_dict['explicit_nodes'].items():
+ nodes.append(Node(url, roles.split(",")))
+
+ log_nodes_statistic(nodes)
if 'connect' in opts.stages:
- for node in current_data:
- pass
+ connect_all(nodes)
- print "\n".join(map(str, current_data))
+ if 'deploy_sensors' in opts.stages:
+ logger.info("Deploing sensors")
+ cfg = cfg_dict.get('sensors')
+ sens_cfg = []
+
+ for role, sensors_str in cfg["roles_mapping"].items():
+ sensors = [sens.strip() for sens in sensors_str.split(",")]
+
+ collect_cfg = dict((sensor, {}) for sensor in sensors)
+
+ for node in nodes:
+ if role in node.roles:
+ sens_cfg.append((node.connection, collect_cfg))
+
+ log_sensors_config(sens_cfg)
+
+ sensor_cm = start_monitoring(cfg["receiver_uri"], None,
+ connected_config=sens_cfg)
+
+ with sensor_cm as sensors_control_queue:
+ th = threading.Thread(None, save_sensors_data, None,
+ (sensors_control_queue,))
+ th.daemon = True
+ th.start()
+
+ # TODO: wait till all nodes start to send sensors data
+
+ if 'run_tests' in opts.stages:
+ run_tests(cfg_dict, nodes)
+
+ sensors_control_queue.put(None)
+ th.join()
+ elif 'run_tests' in opts.stages:
+ run_tests(cfg_dict, nodes)
+
+ logger.info("Disconnecting")
+ for node in nodes:
+ node.connection.close()
+
return 0
- # tests = cfg_dict.get("tests", [])
-
- # Deploy and start sensors
- # deploy_and_start_sensors(cfg_dict.get('sensors'), nodes_to_run)
-
- # for test_name, opts in tests.items():
- # cmd_line = " ".join(opts['opts'])
- # logger.debug("Run test with {0!r} params".format(cmd_line))
- # latest_start_time = 300 + time.time()
- # uris = [node.connection_url for node in nodes_to_run]
- # runner = ssh_runner.get_ssh_runner(uris, conn_func,
- # latest_start_time,
- # opts.get('keep_temp_files'))
- # res = run_io_test(test_name,
- # opts['opts'],
- # runner,
- # opts.get('keep_temp_files'))
- # logger.debug(format_result(res, get_formatter(test_name)))
-
- # if cfg_dict.get('data_server_url'):
- # result = json.loads(get_formatter(opts.tool_type)(res))
- # result['name'] = opts.build_name
- # add_test(opts.build_name, result, opts.data_server_url)
-
- # return 0
-
if __name__ == '__main__':
exit(main(sys.argv))