a lot of chenges
diff --git a/wally/sensors/api.py b/wally/sensors/api.py
index e8c6261..52d33ed 100644
--- a/wally/sensors/api.py
+++ b/wally/sensors/api.py
@@ -1,21 +1,15 @@
-import Queue
+import os
+import time
+import json
 import logging
-import threading
+import contextlib
 
-from .deploy_sensors import (deploy_and_start_sensors,
-                             stop_and_remove_sensors)
-from .protocol import create_protocol, Timeout, CantUnpack
+from concurrent.futures import ThreadPoolExecutor
+
+from wally.ssh_utils import (copy_paths, run_over_ssh,
+                             save_to_remote, read_from_remote)
 
 
-__all__ = ['Empty', 'recv_main',
-           'deploy_and_start_sensors',
-           'SensorConfig',
-           'stop_and_remove_sensors',
-           'start_listener_thread',
-           ]
-
-
-Empty = Queue.Empty
 logger = logging.getLogger("wally.sensors")
 
 
@@ -29,40 +23,71 @@
         self.monitor_url = monitor_url
 
 
-def recv_main(proto, data_q, cmd_q):
-    while True:
+@contextlib.contextmanager
+def with_sensors(sensor_configs, remote_path):
+    paths = {os.path.dirname(__file__):
+             os.path.join(remote_path, "sensors")}
+    config_remote_path = os.path.join(remote_path, "conf.json")
+
+    def deploy_sensors(node_sensor_config):
+        copy_paths(node_sensor_config.conn, paths)
+        with node_sensor_config.conn.open_sftp() as sftp:
+            sensors_config = node_sensor_config.sensors.copy()
+            sensors_config['source_id'] = node_sensor_config.source_id
+            save_to_remote(sftp, config_remote_path,
+                           json.dumps(sensors_config))
+
+    def remove_sensors(node_sensor_config):
+        run_over_ssh(node_sensor_config.conn,
+                     "rm -rf {0}".format(remote_path),
+                     node=node_sensor_config.url, timeout=10)
+
+    logger.debug("Installing sensors on {0} nodes".format(len(sensor_configs)))
+    with ThreadPoolExecutor(max_workers=32) as executor:
+        list(executor.map(deploy_sensors, sensor_configs))
         try:
-            ip, packet = proto.recv(0.1)
-            if packet is not None:
-                data_q.put((ip, packet))
-        except AssertionError as exc:
-            logger.warning("Error in sensor data " + str(exc))
-        except Timeout:
-            pass
-        except CantUnpack as exc:
-            print exc
+            yield
+        finally:
+            list(executor.map(remove_sensors, sensor_configs))
 
+
+@contextlib.contextmanager
+def sensors_info(sensor_configs, remote_path):
+    config_remote_path = os.path.join(remote_path, "conf.json")
+
+    def start_sensors(node_sensor_config):
+        cmd_templ = 'env PYTHONPATH="{0}" python -m ' + \
+                    "sensors.main -d start -u {1} {2}"
+
+        cmd = cmd_templ.format(remote_path,
+                               node_sensor_config.monitor_url,
+                               config_remote_path)
+
+        run_over_ssh(node_sensor_config.conn, cmd,
+                     node=node_sensor_config.url)
+
+    def stop_and_gather_data(node_sensor_config):
+        cmd = 'env PYTHONPATH="{0}" python -m sensors.main -d stop'
+        cmd = cmd.format(remote_path)
+        run_over_ssh(node_sensor_config.conn, cmd,
+                     node=node_sensor_config.url)
+        # some magic
+        time.sleep(1)
+
+        assert node_sensor_config.monitor_url.startswith("csvfile://")
+
+        res_path = node_sensor_config.monitor_url.split("//", 1)[1]
+        with node_sensor_config.conn.open_sftp() as sftp:
+            res = read_from_remote(sftp, res_path)
+
+        return res
+
+    results = []
+
+    logger.debug("Starting sensors on {0} nodes".format(len(sensor_configs)))
+    with ThreadPoolExecutor(max_workers=32) as executor:
+        list(executor.map(start_sensors, sensor_configs))
         try:
-            val = cmd_q.get(False)
-
-            if val is None:
-                return
-
-        except Queue.Empty:
-            pass
-
-
-def start_listener_thread(uri):
-    data_q = Queue.Queue()
-    cmd_q = Queue.Queue()
-    logger.debug("Listening for sensor data on " + uri)
-    proto = create_protocol(uri, receiver=True)
-    th = threading.Thread(None, recv_main, None, (proto, data_q, cmd_q))
-    th.daemon = True
-    th.start()
-
-    def stop_thread():
-        cmd_q.put(None)
-        th.join()
-
-    return data_q, stop_thread
+            yield results
+        finally:
+            results.extend(executor.map(stop_and_gather_data, sensor_configs))
diff --git a/wally/sensors/deploy_sensors.py b/wally/sensors/deploy_sensors.py
index 4a1c5df..82ab21a 100644
--- a/wally/sensors/deploy_sensors.py
+++ b/wally/sensors/deploy_sensors.py
@@ -3,9 +3,8 @@
 import os.path
 import logging
 
-from concurrent.futures import ThreadPoolExecutor, wait
-
-from wally.ssh_utils import copy_paths, run_over_ssh
+from wally.ssh_utils import (copy_paths, run_over_ssh,
+                             save_to_remote, read_from_remote)
 
 logger = logging.getLogger('wally.sensors')
 
@@ -34,25 +33,23 @@
 def deploy_and_start_sensor(paths, node_sensor_config, remote_path):
     try:
         copy_paths(node_sensor_config.conn, paths)
-        sftp = node_sensor_config.conn.open_sftp()
+        with node_sensor_config.conn.open_sftp() as sftp:
+            config_remote_path = os.path.join(remote_path, "conf.json")
 
-        config_remote_path = os.path.join(remote_path, "conf.json")
+            sensors_config = node_sensor_config.sensors.copy()
+            sensors_config['source_id'] = node_sensor_config.source_id
+            with sftp.open(config_remote_path, "w") as fd:
+                fd.write(json.dumps(sensors_config))
 
-        sensors_config = node_sensor_config.sensors.copy()
-        sensors_config['source_id'] = node_sensor_config.source_id
-        with sftp.open(config_remote_path, "w") as fd:
-            fd.write(json.dumps(sensors_config))
+            cmd_templ = 'env PYTHONPATH="{0}" python -m ' + \
+                        "sensors.main -d start -u {1} {2}"
 
-        cmd_templ = 'env PYTHONPATH="{0}" python -m ' + \
-                    "sensors.main -d start -u {1} {2}"
+            cmd = cmd_templ.format(os.path.dirname(remote_path),
+                                   node_sensor_config.monitor_url,
+                                   config_remote_path)
 
-        cmd = cmd_templ.format(os.path.dirname(remote_path),
-                               node_sensor_config.monitor_url,
-                               config_remote_path)
-
-        run_over_ssh(node_sensor_config.conn, cmd,
-                     node=node_sensor_config.url)
-        sftp.close()
+            run_over_ssh(node_sensor_config.conn, cmd,
+                         node=node_sensor_config.url)
 
     except:
         msg = "During deploing sensors on {0}".format(node_sensor_config.url)
diff --git a/wally/sensors/influx_exporter.py b/wally/sensors/influx_exporter.py
deleted file mode 100644
index 34b3c0a..0000000
--- a/wally/sensors/influx_exporter.py
+++ /dev/null
@@ -1,31 +0,0 @@
-from urlparse import urlparse
-from influxdb import InfluxDBClient
-
-
-def connect(url):
-    parsed_url = urlparse(url)
-    user_passwd, host_port = parsed_url.netloc.rsplit("@", 1)
-    user, passwd = user_passwd.split(":", 1)
-    host, port = host_port.split(":")
-    return InfluxDBClient(host, int(port), user, passwd, parsed_url.path[1:])
-
-
-def add_data(conn, hostname, data):
-    per_sensor_data = {}
-    for serie in data:
-        serie = serie.copy()
-        gtime = serie.pop('time')
-        for key, val in serie.items():
-            dev, sensor = key.split('.')
-            data = per_sensor_data.setdefault(sensor, [])
-            data.append([gtime, hostname, dev, val])
-
-    infl_data = []
-    columns = ['time', 'host', 'device', 'value']
-    for sensor_name, points in per_sensor_data.items():
-        infl_data.append(
-            {'columns': columns,
-             'name': sensor_name,
-             'points': points})
-
-    conn.write_points(infl_data)
diff --git a/wally/sensors/main.py b/wally/sensors/main.py
index 2d0bc81..20eedc5 100644
--- a/wally/sensors/main.py
+++ b/wally/sensors/main.py
@@ -35,7 +35,9 @@
 def parse_args(args):
     parser = argparse.ArgumentParser()
     parser.add_argument('-d', '--daemon',
-                        choices=('start', 'stop', 'status'),
+                        choices=('start', 'stop', 'status',
+                                 'start_monitoring', 'stop_monitoring',
+                                 'dump_ram_data'),
                         default=None)
 
     parser.add_argument('-u', '--url', default='stdout://')
diff --git a/wally/sensors/protocol.py b/wally/sensors/protocol.py
index c053011..7c8aa0e 100644
--- a/wally/sensors/protocol.py
+++ b/wally/sensors/protocol.py
@@ -188,6 +188,7 @@
     def send(self, data):
         if self.headers is None:
             self.headers = sorted(data)
+            self.headers.remove('source_id')
 
             for pos, header in enumerate(self.headers):
                 self.line_format += "{%s:>%s}" % (pos,
@@ -197,6 +198,7 @@
             print self.line_format.format(*self.headers)
 
         if self.delta:
+
             vals = [data[header].value - self.prev.get(header, 0)
                     for header in self.headers]
 
@@ -219,7 +221,7 @@
 
 
 class CSVFileTransport(ITransport):
-    required_keys = set(['time', 'source_id', 'hostname'])
+    required_keys = set(['time', 'source_id'])
 
     def __init__(self, receiver, fname):
         ITransport.__init__(self, receiver)
@@ -234,10 +236,25 @@
             assert self.required_keys.issubset(keys)
             keys -= self.required_keys
             self.field_list = sorted(keys)
-            self.csv_fd.writerow([data['source_id'], data['hostname']] +
+            self.csv_fd.writerow([data['source_id'], socket.getfqdn()] +
                                  self.field_list)
+            self.field_list = ['time'] + self.field_list
 
-        self.csv_fd.writerow(map(data.__getitem__, ['time'] + self.field_list))
+        self.csv_fd.writerow([data[sens].value for sens in self.field_list])
+
+
+class RAMTransport(ITransport):
+    def __init__(self, next_tr):
+        self.next = next_tr
+        self.data = []
+
+    def send(self, data):
+        self.data.append(data)
+
+    def flush(self):
+        for data in self.data:
+            self.next.send(data)
+        self.data = []
 
 
 class UDPTransport(ITransport):
@@ -269,10 +286,11 @@
 
 
 def create_protocol(uri, receiver=False):
-    parsed_uri = urlparse(uri)
-    if parsed_uri.scheme == 'stdout':
+    if uri == 'stdout':
         return StdoutTransport(receiver)
-    elif parsed_uri.scheme == 'udp':
+
+    parsed_uri = urlparse(uri)
+    if parsed_uri.scheme == 'udp':
         ip, port = parsed_uri.netloc.split(":")
 
         if receiver:
@@ -286,6 +304,9 @@
         return FileTransport(receiver, parsed_uri.path)
     elif parsed_uri.scheme == 'csvfile':
         return CSVFileTransport(receiver, parsed_uri.path)
+    elif parsed_uri.scheme == 'ram':
+        intenal_recv = CSVFileTransport(receiver, parsed_uri.path)
+        return RAMTransport(intenal_recv)
     else:
         templ = "Can't instantiate transport from {0!r}"
         raise ValueError(templ.format(uri))