fixing, improve sersors installation code
diff --git a/wally/run_test.py b/wally/run_test.py
index 7899bae..42e96ff 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -2,25 +2,24 @@
 
 import os
 import sys
-import time
 import Queue
 import pprint
 import logging
 import argparse
+import functools
 import threading
+import contextlib
 import collections
 
 import yaml
 from concurrent.futures import ThreadPoolExecutor
 
 from wally import pretty_yaml
+from wally.tests_sensors import deploy_sensors_stage
 from wally.discover import discover, Node, undiscover
 from wally import utils, report, ssh_utils, start_vms
 from wally.suits.itest import IOPerfTest, PgBenchTest
 from wally.config import cfg_dict, load_config, setup_loggers
-from wally.sensors.api import (start_monitoring,
-                               deploy_and_start_sensors,
-                               SensorConfig)
 
 
 logger = logging.getLogger("wally")
@@ -40,54 +39,37 @@
         self.nodes = []
         self.clear_calls_stack = []
         self.openstack_nodes_ids = []
-        self.sensor_cm = None
-        self.keep_vm = False
-        self.sensors_control_queue = None
-        self.sensor_listen_thread = None
+        self.sensors_mon_q = None
 
 
-def connect_one(node):
+def connect_one(node, vm=False):
     try:
         ssh_pref = "ssh://"
         if node.conn_url.startswith(ssh_pref):
             url = node.conn_url[len(ssh_pref):]
-            logger.debug("Try connect to " + url)
-            node.connection = ssh_utils.connect(url)
+
+            if vm:
+                ret_count = 24
+                log_warns = False
+            else:
+                ret_count = 3
+                log_warns = True
+
+            node.connection = ssh_utils.connect(url,
+                                                retry_count=ret_count,
+                                                log_warns=log_warns)
         else:
             raise ValueError("Unknown url type {0}".format(node.conn_url))
     except Exception:
-        logger.exception("During connect to {0}".format(node))
-        raise
+        logger.exception("During connect to " + node.get_conn_id())
+        node.connection = None
 
 
-def connect_all(nodes):
+def connect_all(nodes, vm=False):
     logger.info("Connecting to nodes")
     with ThreadPoolExecutor(32) as pool:
-        list(pool.map(connect_one, nodes))
-    logger.info("All nodes connected successfully")
-
-
-def save_sensors_data(q, mon_q, fd):
-    logger.info("Start receiving sensors data")
-    fd.write("\n")
-
-    observed_nodes = set()
-
-    try:
-        while True:
-            val = q.get()
-            if val is None:
-                break
-
-            addr, data = val
-            if addr not in observed_nodes:
-                mon_q.put(addr)
-                observed_nodes.add(addr)
-
-            fd.write("{0!s} : {1!r}\n".format(time.time(), repr(val)))
-    except Exception:
-        logger.exception("Error in sensors thread")
-    logger.info("Sensors thread exits")
+        connect_one_f = functools.partial(connect_one, vm=vm)
+        list(pool.map(connect_one_f, nodes))
 
 
 def test_thread(test, node, barrier, res_q):
@@ -185,11 +167,23 @@
     ctx.clear_calls_stack.append(disconnect_stage)
     connect_all(ctx.nodes)
 
+    all_ok = True
 
-def make_undiscover_stage(clean_data):
-    def undiscover_stage(cfg, ctx):
-        undiscover(clean_data)
-    return undiscover_stage
+    for node in ctx.nodes:
+        if node.connection is None:
+            if 'testnode' in node.roles:
+                msg = "Can't connect to testnode {0}"
+                raise RuntimeError(msg.format(node.get_conn_id()))
+            else:
+                msg = "Node {0} would be excluded - can't connect"
+                logger.warning(msg.format(node.get_conn_id()))
+                all_ok = False
+
+    if all_ok:
+        logger.info("All nodes connected successfully")
+
+    ctx.nodes = [node for node in ctx.nodes
+                 if node.connection is not None]
 
 
 def discover_stage(cfg, ctx):
@@ -198,102 +192,17 @@
 
         nodes, clean_data = discover(ctx, discover_objs,
                                      cfg['clouds'], cfg['var_dir'])
-        ctx.clear_calls_stack.append(make_undiscover_stage(clean_data))
+
+        def undiscover_stage(cfg, ctx):
+            undiscover(clean_data)
+
+        ctx.clear_calls_stack.append(undiscover_stage)
         ctx.nodes.extend(nodes)
 
     for url, roles in cfg.get('explicit_nodes', {}).items():
         ctx.nodes.append(Node(url, roles.split(",")))
 
 
-def deploy_sensors_stage(cfg_dict, ctx):
-    if 'sensors' not in cfg_dict:
-        return
-
-    cfg = cfg_dict.get('sensors')
-
-    sensors_configs = []
-    monitored_nodes = []
-
-    for role, sensors_str in cfg["roles_mapping"].items():
-        sensors = [sens.strip() for sens in sensors_str.split(",")]
-
-        collect_cfg = dict((sensor, {}) for sensor in sensors)
-
-        for node in ctx.nodes:
-            if role in node.roles:
-                monitored_nodes.append(node)
-                sens_cfg = SensorConfig(node.connection,
-                                        node.get_ip(),
-                                        collect_cfg)
-                sensors_configs.append(sens_cfg)
-
-    if len(monitored_nodes) == 0:
-        logger.info("Nothing to monitor, no sensors would be installed")
-        return
-
-    ctx.receiver_uri = cfg["receiver_uri"]
-    nodes_ips = [node.get_ip() for node in monitored_nodes]
-    if '{ip}' in ctx.receiver_uri:
-        ips = set(map(utils.get_ip_for_target, nodes_ips))
-
-        if len(ips) > 1:
-            raise ValueError("Can't select external ip for sensors server")
-
-        if len(ips) == 0:
-            raise ValueError("Can't find any external ip for sensors server")
-
-        ext_ip = list(ips)[0]
-        ctx.receiver_uri = ctx.receiver_uri.format(ip=ext_ip)
-
-    ctx.clear_calls_stack.append(remove_sensors_stage)
-    ctx.sensor_cm = start_monitoring(ctx.receiver_uri, sensors_configs)
-
-    ctx.sensors_control_queue = ctx.sensor_cm.__enter__()
-
-    mon_q = Queue.Queue()
-
-    fd = open(cfg_dict['sensor_storage'], "w")
-    th = threading.Thread(None, save_sensors_data, None,
-                          (ctx.sensors_control_queue, mon_q, fd))
-    th.daemon = True
-    th.start()
-    ctx.sensor_listen_thread = th
-
-    nodes_ips_set = set(nodes_ips)
-    MAX_WAIT_FOR_SENSORS = 10
-    etime = time.time() + MAX_WAIT_FOR_SENSORS
-
-    msg = "Waiting at most {0}s till all {1} nodes starts report sensors data"
-    logger.debug(msg.format(MAX_WAIT_FOR_SENSORS, len(nodes_ips_set)))
-
-    # wait till all nodes start sending data
-    while len(nodes_ips_set) != 0:
-        tleft = etime - time.time()
-        try:
-            data = mon_q.get(True, tleft)
-            ip, port = data
-        except Queue.Empty:
-            msg = "Node {0} not sending any sensor data in {1}s"
-            msg = msg.format(", ".join(nodes_ips_set), MAX_WAIT_FOR_SENSORS)
-            raise RuntimeError(msg)
-
-        if ip not in nodes_ips_set:
-            logger.warning("Receive sensors from extra node: {0}".format(ip))
-
-        nodes_ips_set.remove(ip)
-
-
-def remove_sensors_stage(cfg, ctx):
-    if ctx.sensor_cm is not None:
-        ctx.sensor_cm.__exit__(None, None, None)
-
-        if ctx.sensors_control_queue is not None:
-            ctx.sensors_control_queue.put(None)
-
-        if ctx.sensor_listen_thread is not None:
-            ctx.sensor_listen_thread.join()
-
-
 def get_os_credentials(cfg, ctx, creds_type):
     creds = None
 
@@ -328,6 +237,38 @@
     return creds
 
 
+@contextlib.contextmanager
+def create_vms_ctx(ctx, cfg, config):
+    params = config['vm_params'].copy()
+    os_nodes_ids = []
+
+    os_creds_type = config['creds']
+    os_creds = get_os_credentials(cfg, ctx, os_creds_type)
+
+    start_vms.nova_connect(**os_creds)
+
+    logger.info("Preparing openstack")
+    start_vms.prepare_os_subpr(**os_creds)
+
+    new_nodes = []
+    try:
+        params['group_name'] = cfg_dict['run_uuid']
+        for new_node, node_id in start_vms.launch_vms(params):
+            new_node.roles.append('testnode')
+            ctx.nodes.append(new_node)
+            os_nodes_ids.append(node_id)
+            new_nodes.append(new_node)
+
+        store_nodes_in_log(cfg, os_nodes_ids)
+        ctx.openstack_nodes_ids = os_nodes_ids
+
+        yield new_nodes
+
+    finally:
+        if not cfg['keep_vm']:
+            shut_down_vms_stage(cfg, ctx)
+
+
 def run_tests_stage(cfg, ctx):
     ctx.results = []
 
@@ -340,54 +281,21 @@
         key, config = group.items()[0]
 
         if 'start_test_nodes' == key:
-            params = config['vm_params'].copy()
-            os_nodes_ids = []
+            with create_vms_ctx(ctx, cfg, config) as new_nodes:
+                connect_all(new_nodes, True)
 
-            os_creds_type = config['creds']
-            os_creds = get_os_credentials(cfg, ctx, os_creds_type)
+                for node in new_nodes:
+                    if node.connection is None:
+                        msg = "Failed to connect to vm {0}"
+                        raise RuntimeError(msg.format(node.get_conn_id()))
 
-            start_vms.nova_connect(**os_creds)
-
-            logger.info("Preparing openstack")
-            start_vms.prepare_os_subpr(**os_creds)
-
-            new_nodes = []
-            try:
-                params['group_name'] = cfg_dict['run_uuid']
-                for new_node, node_id in start_vms.launch_vms(params):
-                    new_node.roles.append('testnode')
-                    ctx.nodes.append(new_node)
-                    os_nodes_ids.append(node_id)
-                    new_nodes.append(new_node)
-
-                store_nodes_in_log(cfg, os_nodes_ids)
-                ctx.openstack_nodes_ids = os_nodes_ids
-
-                connect_all(new_nodes)
-
-                # deploy sensors on new nodes
-                # unify this code
-                if 'sensors' in cfg:
-                    sens_cfg = []
-                    sensors_str = cfg["sensors"]["roles_mapping"]['testnode']
-                    sensors = [sens.strip() for sens in sensors_str.split(",")]
-
-                    collect_cfg = dict((sensor, {}) for sensor in sensors)
-                    for node in new_nodes:
-                        sens_cfg.append((node.connection, collect_cfg))
-
-                    uri = cfg["sensors"]["receiver_uri"]
-                    logger.debug("Installing sensors on vm's")
-                    deploy_and_start_sensors(uri, None,
-                                             connected_config=sens_cfg)
+                deploy_sensors_stage(cfg_dict,
+                                     ctx,
+                                     nodes=new_nodes,
+                                     undeploy=False)
 
                 for test_group in config.get('tests', []):
                     ctx.results.extend(run_tests(test_group, ctx.nodes))
-
-            finally:
-                if not ctx.keep_vm:
-                    shut_down_vms_stage(cfg, ctx)
-
         else:
             ctx.results.extend(run_tests(group, ctx.nodes))
 
@@ -426,22 +334,6 @@
             node.connection.close()
 
 
-def yamable(data):
-    if isinstance(data, (tuple, list)):
-        return map(yamable, data)
-
-    if isinstance(data, unicode):
-        return str(data)
-
-    if isinstance(data, dict):
-        res = {}
-        for k, v in data.items():
-            res[yamable(k)] = yamable(v)
-        return res
-
-    return data
-
-
 def store_raw_results_stage(cfg, ctx):
 
     raw_results = os.path.join(cfg_dict['var_dir'], 'raw_results.yaml')
@@ -451,7 +343,7 @@
     else:
         cont = []
 
-    cont.extend(yamable(ctx.results))
+    cont.extend(utils.yamable(ctx.results))
     raw_data = pretty_yaml.dumps(cont)
 
     with open(raw_results, "w") as fd:
@@ -564,7 +456,7 @@
     ctx.build_meta['build_descrption'] = opts.build_description
     ctx.build_meta['build_type'] = opts.build_type
     ctx.build_meta['username'] = opts.username
-    ctx.keep_vm = opts.keep_vm
+    cfg_dict['keep_vm'] = opts.keep_vm
 
     try:
         for stage in stages: