fixing, improve sersors installation code
diff --git a/local_ceph.yaml b/local_ceph.yaml
index e945a06..fc54189 100644
--- a/local_ceph.yaml
+++ b/local_ceph.yaml
@@ -10,7 +10,7 @@
     var_dir_root: /tmp/perf_tests
 
 sensors:
-    receiver_uri: "udp://{ip}:5699"
+    receiver_url: "udp://{ip}:5699"
     roles_mapping:
         ceph-osd: block-io
         cinder: block-io, system-cpu
diff --git a/perf1.yaml b/perf1.yaml
index ecfd26b..fed57f5 100644
--- a/perf1.yaml
+++ b/perf1.yaml
@@ -1,6 +1,6 @@
 clouds:
     fuel:
-        url: http://172.6.52.112:8000/
+        url: http://172.16.52.112:8000/
         creds: admin:admin@admin
         ssh_creds: root:test37
         openstack_env: test
@@ -14,8 +14,7 @@
     var_dir_root: /tmp/perf_tests
 
 sensors:
-    # receiver_uri: udp://172.18.217.10:5699
-    receiver_uri: "udp://{ip}:5699"
+    receiver_url: "udp://{ip}:5699"
     roles_mapping:
         ceph-osd: block-io
         cinder: block-io, system-cpu
@@ -41,14 +40,14 @@
             flt_ip_pool: nova
             private_key_path: disk_io_perf.pem
             creds: "ssh://ubuntu@{ip}::{private_key_path}"
-            name_templ: disk_io_perf-{0}
-            scheduler_group_name: disk_io_group_aa-{0}
+            name_templ: wally-{group}-{id}
+            scheduler_group_name: wally-{group}-{id}
             security_group: disk_io_perf
 
         tests:
             - io:
-                cfg: tests/io_scenario_hdd.cfg
-                # cfg: scripts/fio_tests_configs/io_task_test.cfg
+                # cfg: tests/io_scenario_hdd.cfg
+                cfg: scripts/fio_tests_configs/io_task_test.cfg
                 params:
                     FILENAME: /opt/xxx.bin
                     NUM_ROUNDS: 7
diff --git a/usb_hdd.yaml b/usb_hdd.yaml
index 59e1259..224c6f6 100644
--- a/usb_hdd.yaml
+++ b/usb_hdd.yaml
@@ -6,9 +6,7 @@
     var_dir_root: /tmp/perf_tests
 
 sensors:
-    # receiver_uri: udp://172.18.217.10:5699
-    receiver_uri: "udp://192.168.0.108:5699"
-    # receiver_uri: "udp://{ip}:5699"
+    receiver_url: "udp://{ip}:5699"
     roles_mapping:
         ceph-osd: block-io
         cinder: block-io, system-cpu
diff --git a/vEnv-3-2.yaml b/vEnv-3-2.yaml
index 40ba677..385013a 100644
--- a/vEnv-3-2.yaml
+++ b/vEnv-3-2.yaml
@@ -13,13 +13,12 @@
 internal:
     var_dir_root: /tmp/perf_tests
 
-sensors:
-    # receiver_uri: udp://172.18.217.10:5699
-    receiver_uri: "udp://{ip}:5699"
-    roles_mapping:
-        ceph-osd: block-io
-        cinder: block-io, system-cpu
-        testnode: system-cpu, block-io
+# sensors:
+#     receiver_url: "udp://{ip}:5699"
+#     roles_mapping:
+#         ceph-osd: block-io
+#         cinder: block-io, system-cpu
+#         testnode: system-cpu, block-io
 
 tests:
     - start_test_nodes:
diff --git a/wally/config.py b/wally/config.py
index 03b7ac9..7dba134 100644
--- a/wally/config.py
+++ b/wally/config.py
@@ -108,7 +108,7 @@
     sh = logging.StreamHandler()
     sh.setLevel(def_level)
 
-    log_format = '%(asctime)s - %(levelname)s - %(name)s - %(message)s'
+    log_format = '%(asctime)s - %(levelname)s - %(name)-15s - %(message)s'
     colored_formatter = ColoredFormatter(log_format, datefmt="%H:%M:%S")
 
     sh.setFormatter(colored_formatter)
@@ -118,7 +118,7 @@
 
     if log_fname is not None:
         fh = logging.FileHandler(log_fname)
-        log_format = '%(asctime)s - %(levelname)8s - %(name)s - %(message)s'
+        log_format = '%(asctime)s - %(levelname)8s - %(name)-15s - %(message)s'
         formatter = logging.Formatter(log_format, datefmt="%H:%M:%S")
         fh.setFormatter(formatter)
         fh.setLevel(logging.DEBUG)
diff --git a/wally/discover/discover.py b/wally/discover/discover.py
index 8c78387..3cd5cff 100644
--- a/wally/discover/discover.py
+++ b/wally/discover/discover.py
@@ -1,3 +1,4 @@
+import os.path
 import logging
 
 from . import ceph
@@ -9,6 +10,23 @@
 logger = logging.getLogger("wally.discover")
 
 
+openrc_templ = """#!/bin/sh
+export LC_ALL=C
+export OS_NO_CACHE='true'
+export OS_TENANT_NAME='{tenant}'
+export OS_USERNAME='{name}'
+export OS_PASSWORD='{passwd}'
+export OS_AUTH_URL='{auth_url}'
+export OS_AUTH_STRATEGY='keystone'
+export OS_REGION_NAME='RegionOne'
+export CINDER_ENDPOINT_TYPE='publicURL'
+export GLANCE_ENDPOINT_TYPE='publicURL'
+export KEYSTONE_ENDPOINT_TYPE='publicURL'
+export NOVA_ENDPOINT_TYPE='publicURL'
+export NEUTRON_ENDPOINT_TYPE='publicURL'
+"""
+
+
 def discover(ctx, discover, clusters_info, var_dir):
     nodes_to_run = []
     clean_data = None
@@ -38,7 +56,6 @@
             nodes_to_run.extend(os_nodes)
 
         elif cluster == "fuel":
-
             res = fuel.discover_fuel_nodes(clusters_info['fuel'], var_dir)
             nodes, clean_data, openrc_dict = res
 
@@ -47,6 +64,17 @@
                                         'tenant': openrc_dict['tenant_name'],
                                         'auth_url': openrc_dict['os_auth_url']}
 
+            env_name = clusters_info['fuel']['openstack_env']
+            env_f_name = env_name
+            for char in "-+ {}()[]":
+                env_f_name = env_f_name.replace(char, '_')
+
+            fuel_openrc_fname = os.path.join(var_dir,
+                                             env_f_name + "_openrc")
+            with open(fuel_openrc_fname, "w") as fd:
+                fd.write(openrc_templ.format(**ctx.fuel_openstack_creds))
+            msg = "Openrc for cluster {0} saves into {1}"
+            logger.debug(msg.format(env_name, fuel_openrc_fname))
             nodes_to_run.extend(nodes)
 
         elif cluster == "ceph":
diff --git a/wally/discover/fuel.py b/wally/discover/fuel.py
index 6e76188..149ec31 100644
--- a/wally/discover/fuel.py
+++ b/wally/discover/fuel.py
@@ -15,7 +15,7 @@
 
 
 logger = logging.getLogger("wally.discover")
-BASE_PF_PORT = 33467
+BASE_PF_PORT = 44006
 
 
 def discover_fuel_nodes(fuel_data, var_dir):
@@ -32,7 +32,7 @@
 
     fuel_nodes = list(cluster.get_nodes())
 
-    logger.debug("Found FUEL {0}".format("".join(map(str, version))))
+    logger.debug("Found FUEL {0}".format(".".join(map(str, version))))
 
     network = 'fuelweb_admin' if version >= [6, 0] else 'admin'
 
@@ -60,15 +60,17 @@
         conn_url = "ssh://root@{0}:{1}:{2}".format(fuel_host,
                                                    port,
                                                    fuel_key_file)
-        nodes.append(Node(conn_url, fuel_node['roles']))
+        node = Node(conn_url, fuel_node['roles'])
+        node.monitor_url = None
+        nodes.append(node)
         ips_ports.append((ip, port))
 
     logger.debug("Found %s fuel nodes for env %r" %
                  (len(nodes), fuel_data['openstack_env']))
 
-    return ([],
-            (ssh_conn, fuel_ext_iface, ips_ports),
-            cluster.get_openrc())
+    # return ([],
+    #         (ssh_conn, fuel_ext_iface, ips_ports),
+    #         cluster.get_openrc())
 
     return (nodes,
             (ssh_conn, fuel_ext_iface, ips_ports),
diff --git a/wally/discover/node.py b/wally/discover/node.py
index fad4f29..dc1c9b0 100644
--- a/wally/discover/node.py
+++ b/wally/discover/node.py
@@ -7,10 +7,20 @@
         self.roles = roles
         self.conn_url = conn_url
         self.connection = None
+        self.monitor_url = None
 
     def get_ip(self):
         return urlparse.urlparse(self.conn_url).hostname
 
+    def get_conn_id(self):
+        host = urlparse.urlparse(self.conn_url).hostname
+        port = urlparse.urlparse(self.conn_url).port
+
+        if port is None:
+            port = 22
+
+        return host + ":" + str(port)
+
     def __str__(self):
         templ = "<Node: url={conn_url!r} roles={roles}" + \
                 " connected={is_connected}>"
diff --git a/wally/run_test.py b/wally/run_test.py
index 7899bae..42e96ff 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -2,25 +2,24 @@
 
 import os
 import sys
-import time
 import Queue
 import pprint
 import logging
 import argparse
+import functools
 import threading
+import contextlib
 import collections
 
 import yaml
 from concurrent.futures import ThreadPoolExecutor
 
 from wally import pretty_yaml
+from wally.tests_sensors import deploy_sensors_stage
 from wally.discover import discover, Node, undiscover
 from wally import utils, report, ssh_utils, start_vms
 from wally.suits.itest import IOPerfTest, PgBenchTest
 from wally.config import cfg_dict, load_config, setup_loggers
-from wally.sensors.api import (start_monitoring,
-                               deploy_and_start_sensors,
-                               SensorConfig)
 
 
 logger = logging.getLogger("wally")
@@ -40,54 +39,37 @@
         self.nodes = []
         self.clear_calls_stack = []
         self.openstack_nodes_ids = []
-        self.sensor_cm = None
-        self.keep_vm = False
-        self.sensors_control_queue = None
-        self.sensor_listen_thread = None
+        self.sensors_mon_q = None
 
 
-def connect_one(node):
+def connect_one(node, vm=False):
     try:
         ssh_pref = "ssh://"
         if node.conn_url.startswith(ssh_pref):
             url = node.conn_url[len(ssh_pref):]
-            logger.debug("Try connect to " + url)
-            node.connection = ssh_utils.connect(url)
+
+            if vm:
+                ret_count = 24
+                log_warns = False
+            else:
+                ret_count = 3
+                log_warns = True
+
+            node.connection = ssh_utils.connect(url,
+                                                retry_count=ret_count,
+                                                log_warns=log_warns)
         else:
             raise ValueError("Unknown url type {0}".format(node.conn_url))
     except Exception:
-        logger.exception("During connect to {0}".format(node))
-        raise
+        logger.exception("During connect to " + node.get_conn_id())
+        node.connection = None
 
 
-def connect_all(nodes):
+def connect_all(nodes, vm=False):
     logger.info("Connecting to nodes")
     with ThreadPoolExecutor(32) as pool:
-        list(pool.map(connect_one, nodes))
-    logger.info("All nodes connected successfully")
-
-
-def save_sensors_data(q, mon_q, fd):
-    logger.info("Start receiving sensors data")
-    fd.write("\n")
-
-    observed_nodes = set()
-
-    try:
-        while True:
-            val = q.get()
-            if val is None:
-                break
-
-            addr, data = val
-            if addr not in observed_nodes:
-                mon_q.put(addr)
-                observed_nodes.add(addr)
-
-            fd.write("{0!s} : {1!r}\n".format(time.time(), repr(val)))
-    except Exception:
-        logger.exception("Error in sensors thread")
-    logger.info("Sensors thread exits")
+        connect_one_f = functools.partial(connect_one, vm=vm)
+        list(pool.map(connect_one_f, nodes))
 
 
 def test_thread(test, node, barrier, res_q):
@@ -185,11 +167,23 @@
     ctx.clear_calls_stack.append(disconnect_stage)
     connect_all(ctx.nodes)
 
+    all_ok = True
 
-def make_undiscover_stage(clean_data):
-    def undiscover_stage(cfg, ctx):
-        undiscover(clean_data)
-    return undiscover_stage
+    for node in ctx.nodes:
+        if node.connection is None:
+            if 'testnode' in node.roles:
+                msg = "Can't connect to testnode {0}"
+                raise RuntimeError(msg.format(node.get_conn_id()))
+            else:
+                msg = "Node {0} would be excluded - can't connect"
+                logger.warning(msg.format(node.get_conn_id()))
+                all_ok = False
+
+    if all_ok:
+        logger.info("All nodes connected successfully")
+
+    ctx.nodes = [node for node in ctx.nodes
+                 if node.connection is not None]
 
 
 def discover_stage(cfg, ctx):
@@ -198,102 +192,17 @@
 
         nodes, clean_data = discover(ctx, discover_objs,
                                      cfg['clouds'], cfg['var_dir'])
-        ctx.clear_calls_stack.append(make_undiscover_stage(clean_data))
+
+        def undiscover_stage(cfg, ctx):
+            undiscover(clean_data)
+
+        ctx.clear_calls_stack.append(undiscover_stage)
         ctx.nodes.extend(nodes)
 
     for url, roles in cfg.get('explicit_nodes', {}).items():
         ctx.nodes.append(Node(url, roles.split(",")))
 
 
-def deploy_sensors_stage(cfg_dict, ctx):
-    if 'sensors' not in cfg_dict:
-        return
-
-    cfg = cfg_dict.get('sensors')
-
-    sensors_configs = []
-    monitored_nodes = []
-
-    for role, sensors_str in cfg["roles_mapping"].items():
-        sensors = [sens.strip() for sens in sensors_str.split(",")]
-
-        collect_cfg = dict((sensor, {}) for sensor in sensors)
-
-        for node in ctx.nodes:
-            if role in node.roles:
-                monitored_nodes.append(node)
-                sens_cfg = SensorConfig(node.connection,
-                                        node.get_ip(),
-                                        collect_cfg)
-                sensors_configs.append(sens_cfg)
-
-    if len(monitored_nodes) == 0:
-        logger.info("Nothing to monitor, no sensors would be installed")
-        return
-
-    ctx.receiver_uri = cfg["receiver_uri"]
-    nodes_ips = [node.get_ip() for node in monitored_nodes]
-    if '{ip}' in ctx.receiver_uri:
-        ips = set(map(utils.get_ip_for_target, nodes_ips))
-
-        if len(ips) > 1:
-            raise ValueError("Can't select external ip for sensors server")
-
-        if len(ips) == 0:
-            raise ValueError("Can't find any external ip for sensors server")
-
-        ext_ip = list(ips)[0]
-        ctx.receiver_uri = ctx.receiver_uri.format(ip=ext_ip)
-
-    ctx.clear_calls_stack.append(remove_sensors_stage)
-    ctx.sensor_cm = start_monitoring(ctx.receiver_uri, sensors_configs)
-
-    ctx.sensors_control_queue = ctx.sensor_cm.__enter__()
-
-    mon_q = Queue.Queue()
-
-    fd = open(cfg_dict['sensor_storage'], "w")
-    th = threading.Thread(None, save_sensors_data, None,
-                          (ctx.sensors_control_queue, mon_q, fd))
-    th.daemon = True
-    th.start()
-    ctx.sensor_listen_thread = th
-
-    nodes_ips_set = set(nodes_ips)
-    MAX_WAIT_FOR_SENSORS = 10
-    etime = time.time() + MAX_WAIT_FOR_SENSORS
-
-    msg = "Waiting at most {0}s till all {1} nodes starts report sensors data"
-    logger.debug(msg.format(MAX_WAIT_FOR_SENSORS, len(nodes_ips_set)))
-
-    # wait till all nodes start sending data
-    while len(nodes_ips_set) != 0:
-        tleft = etime - time.time()
-        try:
-            data = mon_q.get(True, tleft)
-            ip, port = data
-        except Queue.Empty:
-            msg = "Node {0} not sending any sensor data in {1}s"
-            msg = msg.format(", ".join(nodes_ips_set), MAX_WAIT_FOR_SENSORS)
-            raise RuntimeError(msg)
-
-        if ip not in nodes_ips_set:
-            logger.warning("Receive sensors from extra node: {0}".format(ip))
-
-        nodes_ips_set.remove(ip)
-
-
-def remove_sensors_stage(cfg, ctx):
-    if ctx.sensor_cm is not None:
-        ctx.sensor_cm.__exit__(None, None, None)
-
-        if ctx.sensors_control_queue is not None:
-            ctx.sensors_control_queue.put(None)
-
-        if ctx.sensor_listen_thread is not None:
-            ctx.sensor_listen_thread.join()
-
-
 def get_os_credentials(cfg, ctx, creds_type):
     creds = None
 
@@ -328,6 +237,38 @@
     return creds
 
 
+@contextlib.contextmanager
+def create_vms_ctx(ctx, cfg, config):
+    params = config['vm_params'].copy()
+    os_nodes_ids = []
+
+    os_creds_type = config['creds']
+    os_creds = get_os_credentials(cfg, ctx, os_creds_type)
+
+    start_vms.nova_connect(**os_creds)
+
+    logger.info("Preparing openstack")
+    start_vms.prepare_os_subpr(**os_creds)
+
+    new_nodes = []
+    try:
+        params['group_name'] = cfg_dict['run_uuid']
+        for new_node, node_id in start_vms.launch_vms(params):
+            new_node.roles.append('testnode')
+            ctx.nodes.append(new_node)
+            os_nodes_ids.append(node_id)
+            new_nodes.append(new_node)
+
+        store_nodes_in_log(cfg, os_nodes_ids)
+        ctx.openstack_nodes_ids = os_nodes_ids
+
+        yield new_nodes
+
+    finally:
+        if not cfg['keep_vm']:
+            shut_down_vms_stage(cfg, ctx)
+
+
 def run_tests_stage(cfg, ctx):
     ctx.results = []
 
@@ -340,54 +281,21 @@
         key, config = group.items()[0]
 
         if 'start_test_nodes' == key:
-            params = config['vm_params'].copy()
-            os_nodes_ids = []
+            with create_vms_ctx(ctx, cfg, config) as new_nodes:
+                connect_all(new_nodes, True)
 
-            os_creds_type = config['creds']
-            os_creds = get_os_credentials(cfg, ctx, os_creds_type)
+                for node in new_nodes:
+                    if node.connection is None:
+                        msg = "Failed to connect to vm {0}"
+                        raise RuntimeError(msg.format(node.get_conn_id()))
 
-            start_vms.nova_connect(**os_creds)
-
-            logger.info("Preparing openstack")
-            start_vms.prepare_os_subpr(**os_creds)
-
-            new_nodes = []
-            try:
-                params['group_name'] = cfg_dict['run_uuid']
-                for new_node, node_id in start_vms.launch_vms(params):
-                    new_node.roles.append('testnode')
-                    ctx.nodes.append(new_node)
-                    os_nodes_ids.append(node_id)
-                    new_nodes.append(new_node)
-
-                store_nodes_in_log(cfg, os_nodes_ids)
-                ctx.openstack_nodes_ids = os_nodes_ids
-
-                connect_all(new_nodes)
-
-                # deploy sensors on new nodes
-                # unify this code
-                if 'sensors' in cfg:
-                    sens_cfg = []
-                    sensors_str = cfg["sensors"]["roles_mapping"]['testnode']
-                    sensors = [sens.strip() for sens in sensors_str.split(",")]
-
-                    collect_cfg = dict((sensor, {}) for sensor in sensors)
-                    for node in new_nodes:
-                        sens_cfg.append((node.connection, collect_cfg))
-
-                    uri = cfg["sensors"]["receiver_uri"]
-                    logger.debug("Installing sensors on vm's")
-                    deploy_and_start_sensors(uri, None,
-                                             connected_config=sens_cfg)
+                deploy_sensors_stage(cfg_dict,
+                                     ctx,
+                                     nodes=new_nodes,
+                                     undeploy=False)
 
                 for test_group in config.get('tests', []):
                     ctx.results.extend(run_tests(test_group, ctx.nodes))
-
-            finally:
-                if not ctx.keep_vm:
-                    shut_down_vms_stage(cfg, ctx)
-
         else:
             ctx.results.extend(run_tests(group, ctx.nodes))
 
@@ -426,22 +334,6 @@
             node.connection.close()
 
 
-def yamable(data):
-    if isinstance(data, (tuple, list)):
-        return map(yamable, data)
-
-    if isinstance(data, unicode):
-        return str(data)
-
-    if isinstance(data, dict):
-        res = {}
-        for k, v in data.items():
-            res[yamable(k)] = yamable(v)
-        return res
-
-    return data
-
-
 def store_raw_results_stage(cfg, ctx):
 
     raw_results = os.path.join(cfg_dict['var_dir'], 'raw_results.yaml')
@@ -451,7 +343,7 @@
     else:
         cont = []
 
-    cont.extend(yamable(ctx.results))
+    cont.extend(utils.yamable(ctx.results))
     raw_data = pretty_yaml.dumps(cont)
 
     with open(raw_results, "w") as fd:
@@ -564,7 +456,7 @@
     ctx.build_meta['build_descrption'] = opts.build_description
     ctx.build_meta['build_type'] = opts.build_type
     ctx.build_meta['username'] = opts.username
-    ctx.keep_vm = opts.keep_vm
+    cfg_dict['keep_vm'] = opts.keep_vm
 
     try:
         for stage in stages:
diff --git a/wally/sensors/api.py b/wally/sensors/api.py
index f66bb36..6eed90a 100644
--- a/wally/sensors/api.py
+++ b/wally/sensors/api.py
@@ -1,24 +1,31 @@
 import Queue
+import logging
 import threading
-from contextlib import contextmanager
 
 from .deploy_sensors import (deploy_and_start_sensors,
                              stop_and_remove_sensors)
 from .protocol import create_protocol, Timeout
 
 
-__all__ = ['Empty', 'recv_main', 'start_monitoring',
-           'deploy_and_start_sensors', 'SensorConfig']
+__all__ = ['Empty', 'recv_main',
+           'deploy_and_start_sensors',
+           'SensorConfig',
+           'stop_and_remove_sensors',
+           'start_listener_thread',
+           ]
 
 
 Empty = Queue.Empty
+logger = logging.getLogger("wally.sensors")
 
 
 class SensorConfig(object):
-    def __init__(self, conn, url, sensors):
+    def __init__(self, conn, url, sensors, source_id, monitor_url=None):
         self.conn = conn
         self.url = url
         self.sensors = sensors
+        self.source_id = source_id
+        self.monitor_url = monitor_url
 
 
 def recv_main(proto, data_q, cmd_q):
@@ -38,21 +45,17 @@
             pass
 
 
-@contextmanager
-def start_monitoring(uri, configs):
-    deploy_and_start_sensors(uri, configs)
-    try:
-        data_q = Queue.Queue()
-        cmd_q = Queue.Queue()
-        proto = create_protocol(uri, receiver=True)
-        th = threading.Thread(None, recv_main, None, (proto, data_q, cmd_q))
-        th.daemon = True
-        th.start()
+def start_listener_thread(uri):
+    data_q = Queue.Queue()
+    cmd_q = Queue.Queue()
+    logger.debug("Listening for sensor data on " + uri)
+    proto = create_protocol(uri, receiver=True)
+    th = threading.Thread(None, recv_main, None, (proto, data_q, cmd_q))
+    th.daemon = True
+    th.start()
 
-        try:
-            yield data_q
-        finally:
-            cmd_q.put(None)
-            th.join()
-    finally:
-        stop_and_remove_sensors(configs)
+    def stop_thread():
+        cmd_q.put(None)
+        th.join()
+
+    return data_q, stop_thread
diff --git a/wally/sensors/cp_protocol.py b/wally/sensors/cp_protocol.py
index 4e96afe..1b6993c 100644
--- a/wally/sensors/cp_protocol.py
+++ b/wally/sensors/cp_protocol.py
@@ -8,7 +8,7 @@
 import binascii
 
 
-logger = logging.getLogger("wally")
+logger = logging.getLogger("wally.sensors")
 
 
 # protocol contains 2 type of packet:
diff --git a/wally/sensors/deploy_sensors.py b/wally/sensors/deploy_sensors.py
index 249adfb..73e7902 100644
--- a/wally/sensors/deploy_sensors.py
+++ b/wally/sensors/deploy_sensors.py
@@ -7,14 +7,14 @@
 
 from wally.ssh_utils import copy_paths, run_over_ssh
 
-logger = logging.getLogger('wally')
+logger = logging.getLogger('wally.sensors')
 
 
 def wait_all_ok(futures):
     return all(future.result() for future in futures)
 
 
-def deploy_and_start_sensors(monitor_uri, sensor_configs,
+def deploy_and_start_sensors(sensor_configs,
                              remote_path='/tmp/sensors/sensors'):
 
     paths = {os.path.dirname(__file__): remote_path}
@@ -25,29 +25,29 @@
             futures.append(executor.submit(deploy_and_start_sensor,
                                            paths,
                                            node_sensor_config,
-                                           monitor_uri,
                                            remote_path))
 
         if not wait_all_ok(futures):
             raise RuntimeError("Sensor deployment fails on some nodes")
 
 
-def deploy_and_start_sensor(paths, node_sensor_config,
-                            monitor_uri, remote_path):
+def deploy_and_start_sensor(paths, node_sensor_config, remote_path):
     try:
         copy_paths(node_sensor_config.conn, paths)
         sftp = node_sensor_config.conn.open_sftp()
 
         config_remote_path = os.path.join(remote_path, "conf.json")
 
+        sensors_config = node_sensor_config.sensors.copy()
+        sensors_config['source_id'] = node_sensor_config.source_id
         with sftp.open(config_remote_path, "w") as fd:
-            fd.write(json.dumps(node_sensor_config.sensors))
+            fd.write(json.dumps(sensors_config))
 
         cmd_templ = 'env PYTHONPATH="{0}" python -m ' + \
                     "sensors.main -d start -u {1} {2}"
 
         cmd = cmd_templ.format(os.path.dirname(remote_path),
-                               monitor_uri,
+                               node_sensor_config.monitor_url,
                                config_remote_path)
 
         run_over_ssh(node_sensor_config.conn, cmd,
@@ -55,7 +55,7 @@
         sftp.close()
 
     except:
-        msg = "During deploing sensors in {0}".format(node_sensor_config.url)
+        msg = "During deploing sensors on {0}".format(node_sensor_config.url)
         logger.exception(msg)
         return False
     return True
@@ -69,9 +69,8 @@
     # some magic
     time.sleep(0.3)
 
-    conn.exec_command("rm -rf {0}".format(remote_path))
-
-    logger.debug("Sensors stopped and removed")
+    # logger.warning("Sensors don't removed")
+    run_over_ssh(conn, "rm -rf {0}".format(remote_path), node=url)
 
 
 def stop_and_remove_sensors(configs, remote_path='/tmp/sensors'):
@@ -85,3 +84,4 @@
                                            remote_path))
 
         wait(futures)
+    logger.debug("Sensors stopped and removed")
diff --git a/wally/sensors/main.py b/wally/sensors/main.py
index 3753e7c..e86bbed 100644
--- a/wally/sensors/main.py
+++ b/wally/sensors/main.py
@@ -51,6 +51,11 @@
     prev = {}
 
     while True:
+        try:
+            source_id = str(required_sensors.pop('source_id'))
+        except KeyError:
+            source_id = None
+
         gtime, data = get_values(required_sensors.items())
         curr = {'time': SensorInfo(gtime, True)}
         for name, val in data.items():
@@ -60,6 +65,10 @@
                 prev[name] = val.value
             else:
                 curr[name] = SensorInfo(val.value, False)
+
+        if source_id is not None:
+            curr['source_id'] = source_id
+
         sender.send(curr)
         time.sleep(opts.timeout)
 
diff --git a/wally/sensors/protocol.py b/wally/sensors/protocol.py
index c2ace01..7688f31 100644
--- a/wally/sensors/protocol.py
+++ b/wally/sensors/protocol.py
@@ -25,7 +25,12 @@
 
 class PickleSerializer(ISensortResultsSerializer):
     def pack(self, data):
-        ndata = {key: val.value for key, val in data.items()}
+        ndata = {}
+        for key, val in data.items():
+            if isinstance(val, basestring):
+                ndata[key] = val
+            else:
+                ndata[key] = val.value
         return pickle.dumps(ndata)
 
     def unpack(self, data):
diff --git a/wally/ssh_utils.py b/wally/ssh_utils.py
index 68d4017..0de7816 100644
--- a/wally/ssh_utils.py
+++ b/wally/ssh_utils.py
@@ -13,11 +13,12 @@
 logger = logging.getLogger("wally")
 
 
-def ssh_connect(creds, retry_count=6, timeout=10):
+def ssh_connect(creds, retry_count=6, timeout=10, log_warns=True):
     ssh = paramiko.SSHClient()
     ssh.load_host_keys('/dev/null')
     ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
     ssh.known_hosts = None
+
     for i in range(retry_count):
         try:
             if creds.user is None:
@@ -55,10 +56,20 @@
             # raise ValueError("Wrong credentials {0}".format(creds.__dict__))
         except paramiko.PasswordRequiredException:
             raise
-        except socket.error as err:
-            print err
-            if i == retry_count - 1:
+        except socket.error:
+            retry_left = retry_count - i - 1
+
+            if log_warns:
+                msg = "Node {0.host}:{0.port} connection timeout."
+
+                if 0 != retry_left:
+                    msg += " {0} retry left.".format(retry_left)
+
+                logger.warning(msg.format(creds))
+
+            if 0 == retry_left:
                 raise
+
             time.sleep(1)
 
 
@@ -77,8 +88,12 @@
         try:
             sftp.mkdir(remotepath, mode=mode)
         except IOError:
-            ssh_mkdir(sftp, remotepath.rsplit("/", 1)[0], mode=mode,
-                      intermediate=True)
+            upper_dir = remotepath.rsplit("/", 1)[0]
+
+            if upper_dir == '' or upper_dir == '/':
+                raise
+
+            ssh_mkdir(sftp, upper_dir, mode=mode, intermediate=True)
             return sftp.mkdir(remotepath, mode=mode)
     else:
         sftp.mkdir(remotepath, mode=mode)
@@ -155,12 +170,10 @@
                 else:
                     templ = "Can't copy {0!r} - " + \
                             "it neither a file not a directory"
-                    msg = templ.format(src)
-                    raise OSError(msg)
+                    raise OSError(templ.format(src))
             except Exception as exc:
                 tmpl = "Scp {0!r} => {1!r} failed - {2!r}"
-                msg = tmpl.format(src, dst, exc)
-                raise OSError(msg)
+                raise OSError(tmpl.format(src, dst, exc))
     finally:
         sftp.close()
 
@@ -234,10 +247,10 @@
     raise ValueError("Can't parse {0!r} as ssh uri value".format(uri))
 
 
-def connect(uri):
+def connect(uri, **params):
     creds = parse_ssh_uri(uri)
     creds.port = int(creds.port)
-    return ssh_connect(creds)
+    return ssh_connect(creds, **params)
 
 
 all_sessions_lock = threading.Lock()
diff --git a/wally/start_vms.py b/wally/start_vms.py
index 4fd35ae..b3c141a 100644
--- a/wally/start_vms.py
+++ b/wally/start_vms.py
@@ -266,7 +266,7 @@
             names.append(name_templ.format(group=group_name, id=i))
 
         futures = []
-        logger.debug("Requesting new vms")
+        logger.debug("Requesting new vm's")
 
         for name, flt_ip in zip(names, ips):
             params = (nova, name, keypair_name, img, fl,
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 91e9dd5..1e93247 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -157,6 +157,8 @@
             run_over_ssh(conn, cmd, timeout=msz, node=self.node)
 
     def run(self, conn, barrier):
+        # logger.warning("No tests runned")
+        # return
         cmd_templ = "sudo env python2 {0} --type {1} {2} --json -"
         # cmd_templ = "env python2 {0} --type {1} {2} --json -"
 
diff --git a/wally/tests_sensors.py b/wally/tests_sensors.py
new file mode 100644
index 0000000..1e0b1ad
--- /dev/null
+++ b/wally/tests_sensors.py
@@ -0,0 +1,145 @@
+import time
+import Queue
+import logging
+import threading
+
+from wally import utils
+from wally.config import cfg_dict
+from wally.sensors.api import (start_listener_thread,
+                               deploy_and_start_sensors,
+                               SensorConfig,
+                               stop_and_remove_sensors)
+
+
+logger = logging.getLogger("wally")
+
+
+def save_sensors_data(data_q, mon_q, fd):
+    fd.write("\n")
+
+    observed_nodes = set()
+
+    try:
+        while True:
+            val = data_q.get()
+            if val is None:
+                break
+
+            addr, data = val
+            if addr not in observed_nodes:
+                mon_q.put(addr + (data['source_id'],))
+                observed_nodes.add(addr)
+
+            fd.write("{0!s} : {1!r}\n".format(time.time(), repr(val)))
+    except Exception:
+        logger.exception("Error in sensors thread")
+    logger.info("Sensors thread exits")
+
+
+def get_sensors_config_for_nodes(cfg, nodes):
+    monitored_nodes = []
+    sensors_configs = []
+
+    receiver_url = cfg["receiver_url"]
+    assert '{ip}' in receiver_url
+
+    for role, sensors_str in cfg["roles_mapping"].items():
+        sensors = [sens.strip() for sens in sensors_str.split(",")]
+
+        collect_cfg = dict((sensor, {}) for sensor in sensors)
+
+        for node in nodes:
+            if role in node.roles:
+
+                if node.monitor_url is not None:
+                    monitor_url = node.monitor_url
+                else:
+                    ext_ip = utils.get_ip_for_target(node.get_ip())
+                    monitor_url = receiver_url.format(ip=ext_ip)
+
+                monitored_nodes.append(node)
+                sens_cfg = SensorConfig(node.connection,
+                                        node.get_conn_id(),
+                                        collect_cfg,
+                                        source_id=node.get_conn_id(),
+                                        monitor_url=monitor_url)
+                sensors_configs.append(sens_cfg)
+
+    return monitored_nodes, sensors_configs
+
+
+def start_sensor_process_thread(ctx, cfg, sensors_configs):
+    receiver_url = cfg["receiver_url"]
+    sensors_data_q, stop_sensors_loop = \
+        start_listener_thread(receiver_url.format(ip='0.0.0.0'))
+
+    mon_q = Queue.Queue()
+    fd = open(cfg_dict['sensor_storage'], "w")
+    logger.info("Start sensors data receiving thread")
+    sensor_listen_th = threading.Thread(None, save_sensors_data, None,
+                                        (sensors_data_q, mon_q, fd))
+    sensor_listen_th.daemon = True
+    sensor_listen_th.start()
+
+    def stop_sensors_receiver(cfg, ctx):
+        stop_sensors_loop()
+        sensors_data_q.put(None)
+        sensor_listen_th.join()
+
+    ctx.clear_calls_stack.append(stop_sensors_receiver)
+    return mon_q
+
+
+def deploy_sensors_stage(cfg, ctx, nodes=None, undeploy=True):
+    if 'sensors' not in cfg:
+        return
+
+    cfg = cfg.get('sensors')
+
+    if nodes is None:
+        nodes = ctx.nodes
+
+    logger.info("Deploing new sensors on {0} node(s)".format(len(nodes)))
+    monitored_nodes, sensors_configs = get_sensors_config_for_nodes(cfg,
+                                                                    nodes)
+
+    if len(monitored_nodes) == 0:
+        logger.info("Nothing to monitor, no sensors would be installed")
+        return
+
+    if ctx.sensors_mon_q is None:
+        ctx.sensors_mon_q = start_sensor_process_thread(ctx, cfg,
+                                                        sensors_configs)
+
+    if undeploy:
+        def remove_sensors_stage(cfg, ctx):
+            stop_and_remove_sensors(sensors_configs)
+        ctx.clear_calls_stack.append(remove_sensors_stage)
+
+    deploy_and_start_sensors(sensors_configs)
+    wait_for_new_sensors_data(ctx, monitored_nodes)
+
+
+def wait_for_new_sensors_data(ctx, monitored_nodes):
+    MAX_WAIT_FOR_SENSORS = 10
+    etime = time.time() + MAX_WAIT_FOR_SENSORS
+
+    msg = "Waiting at most {0}s till all {1} nodes starts report sensors data"
+    nodes_ids = set(node.get_conn_id() for node in monitored_nodes)
+    logger.debug(msg.format(MAX_WAIT_FOR_SENSORS, len(nodes_ids)))
+
+    # wait till all nodes start sending data
+    while len(nodes_ids) != 0:
+        tleft = etime - time.time()
+        try:
+            source_id = ctx.sensors_mon_q.get(True, tleft)[2]
+        except Queue.Empty:
+            msg = "Node {0} not sending any sensor data in {1}s"
+            msg = msg.format(", ".join(nodes_ids), MAX_WAIT_FOR_SENSORS)
+            raise RuntimeError(msg)
+
+        if source_id not in nodes_ids:
+            msg = "Receive sensors from extra node: {0}".format(source_id)
+            logger.warning(msg)
+
+        nodes_ids.remove(source_id)
diff --git a/wally/utils.py b/wally/utils.py
index 60645d4..71fbe57 100644
--- a/wally/utils.py
+++ b/wally/utils.py
@@ -122,3 +122,19 @@
     m = (seconds % 3600) // 60
     s = seconds % 60
     return "{0}:{1:02d}:{2:02d}".format(h, m, s)
+
+
+def yamable(data):
+    if isinstance(data, (tuple, list)):
+        return map(yamable, data)
+
+    if isinstance(data, unicode):
+        return str(data)
+
+    if isinstance(data, dict):
+        res = {}
+        for k, v in data.items():
+            res[yamable(k)] = yamable(v)
+        return res
+
+    return data