large commit. new code, with sensors, line count dropped, etc
diff --git a/run_test.py b/run_test.py
index 25da0ab..d1a9f4f 100755
--- a/run_test.py
+++ b/run_test.py
@@ -1,120 +1,36 @@
-import os
 import sys
-import json
-import time
+import Queue
 import pprint
 import logging
-import os.path
 import argparse
+import threading
+import collections
 
+from concurrent.futures import ThreadPoolExecutor
+
+import utils
 import ssh_utils
-import io_scenario
 from nodes import discover
+from nodes.node import Node
 from config import cfg_dict
-from utils import log_error
-from rest_api import add_test
-from formatters import get_formatter
 from itest import IOPerfTest, PgBenchTest
 
+from sensors.api import start_monitoring
+
+
 logger = logging.getLogger("io-perf-tool")
-logger.setLevel(logging.DEBUG)
-ch = logging.StreamHandler()
-ch.setLevel(logging.DEBUG)
-logger.addHandler(ch)
-
-log_format = '%(asctime)s - %(levelname)s - %(name)s - %(message)s'
-formatter = logging.Formatter(log_format,
-                              "%H:%M:%S")
-ch.setFormatter(formatter)
 
 
-tool_type_mapper = {
-    "iozone": IOPerfTest,
-    "fio": IOPerfTest,
-    "pgbench": PgBenchTest,
-}
+def setup_logger(logger, level=logging.DEBUG):
+    logger.setLevel(level)
+    ch = logging.StreamHandler()
+    ch.setLevel(level)
+    logger.addHandler(ch)
 
-
-def run_io_test(tool,
-                script_args,
-                test_runner,
-                keep_temp_files=False):
-
-    files_dir = os.path.dirname(io_scenario.__file__)
-
-    path = 'iozone' if 'iozone' == tool else 'fio'
-    src_testtool_path = os.path.join(files_dir, path)
-
-    obj_cls = tool_type_mapper[tool]
-    obj = obj_cls(script_args,
-                  src_testtool_path,
-                  None,
-                  keep_temp_files)
-
-    return test_runner(obj)
-
-
-def conn_func(obj, barrier, latest_start_time, conn):
-    try:
-        test_iter = itest.run_test_iter(obj, conn)
-        next(test_iter)
-
-        wait_on_barrier(barrier, latest_start_time)
-
-        with log_error("!Run test"):
-            return next(test_iter)
-    except:
-        print traceback.format_exc()
-        raise
-
-
-def parse_args(argv):
-    parser = argparse.ArgumentParser(
-        description="Run disk io performance test")
-
-    parser.add_argument("-l", dest='extra_logs',
-                        action='store_true', default=False,
-                        help="print some extra log info")
-
-    parser.add_argument('stages', nargs="+",
-                        choices=["discover", "connect", "start_new_nodes",
-                                 "deploy_sensors"])
-
-    # THIS ALL MOVE TO CONFIG FILE
-    # parser.add_argument("-o", "--test-opts", dest='opts',
-    #                     help="cmd line options for test")
-
-    # parser.add_argument("-f", "--test-opts-file", dest='opts_file',
-    #                     type=argparse.FileType('r'), default=None,
-    #                     help="file with cmd line options for test")
-
-    # parser.add_argument("--max-preparation-time", default=300,
-    #                     type=int, dest="max_preparation_time")
-
-    # parser.add_argument("-b", "--build-info", default=None,
-    #                     dest="build_name")
-
-    # parser.add_argument("-d", "--data-server-url", default=None,
-    #                     dest="data_server_url")
-
-    # parser.add_argument("-n", "--lab-name", default=None,
-    #                     dest="lab_name")
-
-    # parser.add_argument("--create-vms-opts", default=None,
-    #                     help="Creating vm's before run ssh runner",
-    #                     dest="create_vms_opts")
-
-    # parser.add_argument("-k", "--keep", default=False,
-    #                     help="keep temporary files",
-    #                     dest="keep_temp_files", action='store_true')
-
-    # parser.add_argument("--runner", required=True,
-    #                     choices=["local", "ssh"], help="runner type")
-
-    # parser.add_argument("--runner-extra-opts", default=None,
-    #                     dest="runner_opts", help="runner extra options")
-
-    return parser.parse_args(argv[1:])
+    log_format = '%(asctime)s - %(levelname)-6s - %(name)s - %(message)s'
+    formatter = logging.Formatter(log_format,
+                                  "%H:%M:%S")
+    ch.setFormatter(formatter)
 
 
 def format_result(res, formatter):
@@ -127,59 +43,167 @@
 
 def connect_one(node):
     try:
-        node.connection = ssh_utils.connect(node.connection_url)
+        ssh_pref = "ssh://"
+        if node.conn_url.startswith(ssh_pref):
+            url = node.conn_url[len(ssh_pref):]
+            node.connection = ssh_utils.connect(url)
+        else:
+            raise ValueError("Unknown url type {0}".format(node.conn_url))
     except Exception:
-        logger.exception()
+        logger.exception("During connect to {0}".format(node))
 
 
 def connect_all(nodes):
+    logger.info("Connecting to nodes")
+    with ThreadPoolExecutor(32) as pool:
+        list(pool.map(connect_one, nodes))
+
+
+def save_sensors_data(q):
+    logger.info("Start receiving sensors data")
+    while True:
+        val = q.get()
+        if val is None:
+            break
+        # logger.debug("Sensors -> {0!r}".format(val))
+    logger.info("Sensors thread exits")
+
+
+def test_thread(test, node, barrier):
+    try:
+        logger.debug("Run preparation for {0}".format(node.conn_url))
+        test.pre_run(node.connection)
+        logger.debug("Run test for {0}".format(node.conn_url))
+        test.run(node.connection, barrier)
+    except:
+        logger.exception("In test {0} for node {1}".format(test, node))
+
+
+def run_tests(config, nodes):
+    tool_type_mapper = {
+        "io": IOPerfTest,
+        "pgbench": PgBenchTest,
+    }
+
+    test_nodes = [node for node in nodes
+                  if 'testnode' in node.roles]
+
+    res_q = Queue.Queue()
+
+    for name, params in config['tests'].items():
+        logger.info("Starting {0} tests".format(name))
+
+        threads = []
+        barrier = utils.Barrier(len(test_nodes))
+        for node in test_nodes:
+            msg = "Starting {0} test on {1} node"
+            logger.debug(msg.format(name, node.conn_url))
+            test = tool_type_mapper[name](params, res_q.put)
+            th = threading.Thread(None, test_thread, None,
+                                  (test, node, barrier))
+            threads.append(th)
+            th.daemon = True
+            th.start()
+
+        for th in threads:
+            th.join()
+
+        while not res_q.empty():
+            logger.info("Get test result {0!r}".format(res_q.get()))
+
+
+def parse_args(argv):
+    parser = argparse.ArgumentParser(
+        description="Run disk io performance test")
+
+    parser.add_argument("-l", dest='extra_logs',
+                        action='store_true', default=False,
+                        help="print some extra log info")
+
+    parser.add_argument('stages', nargs="+",
+                        choices=["discover", "connect", "start_new_nodes",
+                                 "deploy_sensors", "run_tests"])
+
+    return parser.parse_args(argv[1:])
+
+
+def log_nodes_statistic(nodes):
+    logger.info("Found {0} nodes total".format(len(nodes)))
+    per_role = collections.defaultdict(lambda: 0)
+    for node in nodes:
+        for role in node.roles:
+            per_role[role] += 1
+
+    for role, count in sorted(per_role.items()):
+        logger.debug("Found {0} nodes with role {1}".format(count, role))
+
+
+def log_sensors_config(cfg):
     pass
 
 
 def main(argv):
-    logging_conf = cfg_dict.get('logging')
-    if logging_conf:
-        if logging_conf.get('extra_logs'):
-            logger.setLevel(logging.DEBUG)
-            ch.setLevel(logging.DEBUG)
-
     opts = parse_args(argv)
+
+    level = logging.DEBUG if opts.extra_logs else logging.WARNING
+    setup_logger(logger, level)
+
+    nodes = []
+
     if 'discover' in opts.stages:
-        current_data = discover.discover(cfg_dict.get('discover'))
+        logger.info("Start node discovery")
+        nodes = discover.discover(cfg_dict.get('discover'))
+
+    if 'explicit_nodes' in cfg_dict:
+        for url, roles in cfg_dict['explicit_nodes'].items():
+            nodes.append(Node(url, roles.split(",")))
+
+    log_nodes_statistic(nodes)
 
     if 'connect' in opts.stages:
-        for node in current_data:
-            pass
+        connect_all(nodes)
 
-    print "\n".join(map(str, current_data))
+    if 'deploy_sensors' in opts.stages:
+        logger.info("Deploing sensors")
+        cfg = cfg_dict.get('sensors')
+        sens_cfg = []
+
+        for role, sensors_str in cfg["roles_mapping"].items():
+            sensors = [sens.strip() for sens in sensors_str.split(",")]
+
+            collect_cfg = dict((sensor, {}) for sensor in sensors)
+
+            for node in nodes:
+                if role in node.roles:
+                    sens_cfg.append((node.connection, collect_cfg))
+
+        log_sensors_config(sens_cfg)
+
+        sensor_cm = start_monitoring(cfg["receiver_uri"], None,
+                                     connected_config=sens_cfg)
+
+        with sensor_cm as sensors_control_queue:
+            th = threading.Thread(None, save_sensors_data, None,
+                                  (sensors_control_queue,))
+            th.daemon = True
+            th.start()
+
+            # TODO: wait till all nodes start to send sensors data
+
+            if 'run_tests' in opts.stages:
+                run_tests(cfg_dict, nodes)
+
+            sensors_control_queue.put(None)
+            th.join()
+    elif 'run_tests' in opts.stages:
+        run_tests(cfg_dict, nodes)
+
+    logger.info("Disconnecting")
+    for node in nodes:
+        node.connection.close()
+
     return 0
 
-    # tests = cfg_dict.get("tests", [])
-
-    # Deploy and start sensors
-    # deploy_and_start_sensors(cfg_dict.get('sensors'), nodes_to_run)
-
-    # for test_name, opts in tests.items():
-    #     cmd_line = " ".join(opts['opts'])
-    #     logger.debug("Run test with {0!r} params".format(cmd_line))
-    #     latest_start_time = 300 + time.time()
-    #     uris = [node.connection_url for node in nodes_to_run]
-    #     runner = ssh_runner.get_ssh_runner(uris, conn_func,
-    #                                        latest_start_time,
-    #                                        opts.get('keep_temp_files'))
-    #     res = run_io_test(test_name,
-    #                       opts['opts'],
-    #                       runner,
-    #                       opts.get('keep_temp_files'))
-    #     logger.debug(format_result(res, get_formatter(test_name)))
-
-    # if cfg_dict.get('data_server_url'):
-    #     result = json.loads(get_formatter(opts.tool_type)(res))
-    #     result['name'] = opts.build_name
-    #     add_test(opts.build_name, result, opts.data_server_url)
-
-    # return 0
-
 
 if __name__ == '__main__':
     exit(main(sys.argv))