fixing, improve sersors installation code
diff --git a/wally/sensors/api.py b/wally/sensors/api.py
index f66bb36..6eed90a 100644
--- a/wally/sensors/api.py
+++ b/wally/sensors/api.py
@@ -1,24 +1,31 @@
 import Queue
+import logging
 import threading
-from contextlib import contextmanager
 
 from .deploy_sensors import (deploy_and_start_sensors,
                              stop_and_remove_sensors)
 from .protocol import create_protocol, Timeout
 
 
-__all__ = ['Empty', 'recv_main', 'start_monitoring',
-           'deploy_and_start_sensors', 'SensorConfig']
+__all__ = ['Empty', 'recv_main',
+           'deploy_and_start_sensors',
+           'SensorConfig',
+           'stop_and_remove_sensors',
+           'start_listener_thread',
+           ]
 
 
 Empty = Queue.Empty
+logger = logging.getLogger("wally.sensors")
 
 
 class SensorConfig(object):
-    def __init__(self, conn, url, sensors):
+    def __init__(self, conn, url, sensors, source_id, monitor_url=None):
         self.conn = conn
         self.url = url
         self.sensors = sensors
+        self.source_id = source_id
+        self.monitor_url = monitor_url
 
 
 def recv_main(proto, data_q, cmd_q):
@@ -38,21 +45,17 @@
             pass
 
 
-@contextmanager
-def start_monitoring(uri, configs):
-    deploy_and_start_sensors(uri, configs)
-    try:
-        data_q = Queue.Queue()
-        cmd_q = Queue.Queue()
-        proto = create_protocol(uri, receiver=True)
-        th = threading.Thread(None, recv_main, None, (proto, data_q, cmd_q))
-        th.daemon = True
-        th.start()
+def start_listener_thread(uri):
+    data_q = Queue.Queue()
+    cmd_q = Queue.Queue()
+    logger.debug("Listening for sensor data on " + uri)
+    proto = create_protocol(uri, receiver=True)
+    th = threading.Thread(None, recv_main, None, (proto, data_q, cmd_q))
+    th.daemon = True
+    th.start()
 
-        try:
-            yield data_q
-        finally:
-            cmd_q.put(None)
-            th.join()
-    finally:
-        stop_and_remove_sensors(configs)
+    def stop_thread():
+        cmd_q.put(None)
+        th.join()
+
+    return data_q, stop_thread
diff --git a/wally/sensors/cp_protocol.py b/wally/sensors/cp_protocol.py
index 4e96afe..1b6993c 100644
--- a/wally/sensors/cp_protocol.py
+++ b/wally/sensors/cp_protocol.py
@@ -8,7 +8,7 @@
 import binascii
 
 
-logger = logging.getLogger("wally")
+logger = logging.getLogger("wally.sensors")
 
 
 # protocol contains 2 type of packet:
diff --git a/wally/sensors/deploy_sensors.py b/wally/sensors/deploy_sensors.py
index 249adfb..73e7902 100644
--- a/wally/sensors/deploy_sensors.py
+++ b/wally/sensors/deploy_sensors.py
@@ -7,14 +7,14 @@
 
 from wally.ssh_utils import copy_paths, run_over_ssh
 
-logger = logging.getLogger('wally')
+logger = logging.getLogger('wally.sensors')
 
 
 def wait_all_ok(futures):
     return all(future.result() for future in futures)
 
 
-def deploy_and_start_sensors(monitor_uri, sensor_configs,
+def deploy_and_start_sensors(sensor_configs,
                              remote_path='/tmp/sensors/sensors'):
 
     paths = {os.path.dirname(__file__): remote_path}
@@ -25,29 +25,29 @@
             futures.append(executor.submit(deploy_and_start_sensor,
                                            paths,
                                            node_sensor_config,
-                                           monitor_uri,
                                            remote_path))
 
         if not wait_all_ok(futures):
             raise RuntimeError("Sensor deployment fails on some nodes")
 
 
-def deploy_and_start_sensor(paths, node_sensor_config,
-                            monitor_uri, remote_path):
+def deploy_and_start_sensor(paths, node_sensor_config, remote_path):
     try:
         copy_paths(node_sensor_config.conn, paths)
         sftp = node_sensor_config.conn.open_sftp()
 
         config_remote_path = os.path.join(remote_path, "conf.json")
 
+        sensors_config = node_sensor_config.sensors.copy()
+        sensors_config['source_id'] = node_sensor_config.source_id
         with sftp.open(config_remote_path, "w") as fd:
-            fd.write(json.dumps(node_sensor_config.sensors))
+            fd.write(json.dumps(sensors_config))
 
         cmd_templ = 'env PYTHONPATH="{0}" python -m ' + \
                     "sensors.main -d start -u {1} {2}"
 
         cmd = cmd_templ.format(os.path.dirname(remote_path),
-                               monitor_uri,
+                               node_sensor_config.monitor_url,
                                config_remote_path)
 
         run_over_ssh(node_sensor_config.conn, cmd,
@@ -55,7 +55,7 @@
         sftp.close()
 
     except:
-        msg = "During deploing sensors in {0}".format(node_sensor_config.url)
+        msg = "During deploing sensors on {0}".format(node_sensor_config.url)
         logger.exception(msg)
         return False
     return True
@@ -69,9 +69,8 @@
     # some magic
     time.sleep(0.3)
 
-    conn.exec_command("rm -rf {0}".format(remote_path))
-
-    logger.debug("Sensors stopped and removed")
+    # logger.warning("Sensors don't removed")
+    run_over_ssh(conn, "rm -rf {0}".format(remote_path), node=url)
 
 
 def stop_and_remove_sensors(configs, remote_path='/tmp/sensors'):
@@ -85,3 +84,4 @@
                                            remote_path))
 
         wait(futures)
+    logger.debug("Sensors stopped and removed")
diff --git a/wally/sensors/main.py b/wally/sensors/main.py
index 3753e7c..e86bbed 100644
--- a/wally/sensors/main.py
+++ b/wally/sensors/main.py
@@ -51,6 +51,11 @@
     prev = {}
 
     while True:
+        try:
+            source_id = str(required_sensors.pop('source_id'))
+        except KeyError:
+            source_id = None
+
         gtime, data = get_values(required_sensors.items())
         curr = {'time': SensorInfo(gtime, True)}
         for name, val in data.items():
@@ -60,6 +65,10 @@
                 prev[name] = val.value
             else:
                 curr[name] = SensorInfo(val.value, False)
+
+        if source_id is not None:
+            curr['source_id'] = source_id
+
         sender.send(curr)
         time.sleep(opts.timeout)
 
diff --git a/wally/sensors/protocol.py b/wally/sensors/protocol.py
index c2ace01..7688f31 100644
--- a/wally/sensors/protocol.py
+++ b/wally/sensors/protocol.py
@@ -25,7 +25,12 @@
 
 class PickleSerializer(ISensortResultsSerializer):
     def pack(self, data):
-        ndata = {key: val.value for key, val in data.items()}
+        ndata = {}
+        for key, val in data.items():
+            if isinstance(val, basestring):
+                ndata[key] = val
+            else:
+                ndata[key] = val.value
         return pickle.dumps(ndata)
 
     def unpack(self, data):