large commit. new code, with sensors, line count dropped, etc
diff --git a/sensors/api.py b/sensors/api.py
index f78e6a9..dc34af0 100644
--- a/sensors/api.py
+++ b/sensors/api.py
@@ -17,6 +17,7 @@
             data_q.put(proto.recv(0.1))
         except Timeout:
             pass
+
         try:
             val = cmd_q.get(False)
 
@@ -28,8 +29,9 @@
 
 
 @contextmanager
-def start_monitoring(uri, config):
-    deploy_and_start_sensors(uri, config)
+def start_monitoring(uri, config=None, connected_config=None):
+    deploy_and_start_sensors(uri, config=config,
+                             connected_config=connected_config)
     try:
         data_q = Queue.Queue()
         cmd_q = Queue.Queue()
@@ -44,4 +46,5 @@
             cmd_q.put(None)
             th.join()
     finally:
-        stop_and_remove_sensors(config)
+        stop_and_remove_sensors(config,
+                                connected_config=connected_config)
diff --git a/sensors/deploy_sensors.py b/sensors/deploy_sensors.py
index b2dc3f1..e0428d9 100644
--- a/sensors/deploy_sensors.py
+++ b/sensors/deploy_sensors.py
@@ -1,33 +1,50 @@
 import time
 import json
 import os.path
+import logging
 
 from concurrent.futures import ThreadPoolExecutor, wait
 
 from disk_perf_test_tool.ssh_utils import connect, copy_paths
 
+logger = logging.getLogger('io-perf-tool')
+
 
 def wait_all_ok(futures):
     return all(future.result() for future in futures)
 
 
-def deploy_and_start_sensors(monitor_uri, config, remote_path='/tmp/sensors'):
+def deploy_and_start_sensors(monitor_uri, config,
+                             remote_path='/tmp/sensors',
+                             connected_config=None):
     paths = {os.path.dirname(__file__): remote_path}
     with ThreadPoolExecutor(max_workers=32) as executor:
         futures = []
 
-        for uri, config in config.items():
+        if connected_config is not None:
+            assert config is None
+            node_iter = connected_config
+        else:
+            node_iter = config.items()
+
+        for uri_or_conn, config in node_iter:
             futures.append(executor.submit(deploy_and_start_sensor,
-                                           paths, uri, monitor_uri,
+                                           paths, uri_or_conn,
+                                           monitor_uri,
                                            config, remote_path))
 
         if not wait_all_ok(futures):
             raise RuntimeError("Sensor deployment fails on some nodes")
 
 
-def deploy_and_start_sensor(paths, uri, monitor_uri, config, remote_path):
+def deploy_and_start_sensor(paths, uri_or_conn, monitor_uri, config,
+                            remote_path):
     try:
-        conn = connect(uri)
+        if isinstance(uri_or_conn, basestring):
+            conn = connect(uri_or_conn)
+        else:
+            conn = uri_or_conn
+
         copy_paths(conn, paths)
         sftp = conn.open_sftp()
 
@@ -41,17 +58,23 @@
         cmd = cmd_templ.format(main_remote_path,
                                monitor_uri,
                                config_remote_path)
-        print "Executing", cmd
         conn.exec_command(cmd)
         sftp.close()
-        conn.close()
+
+        if isinstance(uri_or_conn, basestring):
+            conn.close()
     except:
+        logger.exception("During deploing sensors in {0}".format(uri_or_conn))
         return False
     return True
 
 
-def stop_and_remove_sensor(uri, remote_path):
-    conn = connect(uri)
+def stop_and_remove_sensor(uri_or_conn, remote_path):
+    if isinstance(uri_or_conn, basestring):
+        conn = connect(uri_or_conn)
+    else:
+        conn = uri_or_conn
+
     main_remote_path = os.path.join(remote_path, "main.py")
 
     cmd_templ = "python {0} -d stop"
@@ -62,15 +85,23 @@
 
     conn.exec_command("rm -rf {0}".format(remote_path))
 
-    conn.close()
+    if isinstance(uri_or_conn, basestring):
+        conn.close()
 
 
-def stop_and_remove_sensors(config, remote_path='/tmp/sensors'):
+def stop_and_remove_sensors(config, remote_path='/tmp/sensors',
+                            connected_config=None):
     with ThreadPoolExecutor(max_workers=32) as executor:
         futures = []
 
-        for uri, config in config.items():
+        if connected_config is not None:
+            assert config is None
+            conf_iter = connected_config
+        else:
+            conf_iter = config.items()
+
+        for uri_or_conn, config in conf_iter:
             futures.append(executor.submit(stop_and_remove_sensor,
-                                           uri, remote_path))
+                                           uri_or_conn, remote_path))
 
         wait(futures)
diff --git a/sensors/main.py b/sensors/main.py
index fea46a3..3c953fa 100644
--- a/sensors/main.py
+++ b/sensors/main.py
@@ -64,6 +64,10 @@
         time.sleep(opts.timeout)
 
 
+def pid_running(pid):
+    return os.path.exists("/proc/" + str(pid))
+
+
 def main(argv):
     opts = parse_args(argv)
 
@@ -86,12 +90,12 @@
         elif opts.daemon == 'stop':
             if os.path.isfile(pid_file):
                 pid = int(open(pid_file).read())
-                if os.path.exists("/proc/" + str(pid)):
+                if pid_running(pid):
                     os.kill(pid, signal.SIGTERM)
 
                 time.sleep(0.1)
 
-                if os.path.exists("/proc/" + str(pid)):
+                if pid_running(pid):
                     os.kill(pid, signal.SIGKILL)
 
                 if os.path.isfile(pid_file):
@@ -99,7 +103,7 @@
         elif opts.daemon == 'status':
             if os.path.isfile(pid_file):
                 pid = int(open(pid_file).read())
-                if os.path.exists("/proc/" + str(pid)):
+                if pid_running(pid):
                     print "running"
                     return
             print "stopped"
diff --git a/sensors/protocol.py b/sensors/protocol.py
index cfdd93e..02b661a 100644
--- a/sensors/protocol.py
+++ b/sensors/protocol.py
@@ -1,3 +1,4 @@
+import sys
 import time
 import socket
 import select
@@ -57,6 +58,7 @@
 
 # ------------------------------------- Transports ---------------------------
 
+
 class ITransport(object):
     def __init__(self, receiver):
         pass
@@ -73,12 +75,14 @@
 
     def __init__(self, receiver, delta=True):
         if receiver:
-            raise ValueError("StdoutTransport don't allows receiving")
+            cname = self.__class__.__name__
+            raise ValueError("{0} don't allows receiving".format(cname))
 
         self.headers = None
         self.line_format = ""
         self.prev = {}
         self.delta = delta
+        self.fd = sys.stdout
 
     def send(self, data):
         if self.headers is None:
@@ -100,10 +104,17 @@
         else:
             vals = [data[header].value for header in self.headers]
 
-        print self.line_format.format(*vals)
+        self.fd.write(self.line_format.format(*vals) + "\n")
 
     def recv(self, timeout=None):
-        raise ValueError("StdoutTransport don't allows receiving")
+        cname = self.__class__.__name__
+        raise ValueError("{0} don't allows receiving".format(cname))
+
+
+class FileTransport(StdoutTransport):
+    def __init__(self, receiver, fname, delta=True):
+        StdoutTransport.__init__(self, receiver, delta)
+        self.fd = open(fname, "w")
 
 
 class UDPTransport(ITransport):
@@ -170,10 +181,12 @@
         ip, port = parsed_uri.netloc.split(":")
         return UDPTransport(receiver, ip=ip, port=int(port),
                             packer_cls=PickleSerializer)
+    elif parsed_uri.scheme == 'file':
+        return FileTransport(receiver, parsed_uri.path)
     elif parsed_uri.scheme == 'hugeudp':
         ip, port = parsed_uri.netloc.split(":")
         return HugeUDPTransport(receiver, ip=ip, port=int(port),
-                            packer_cls=MSGPackSerializer)
+                                packer_cls=MSGPackSerializer)
     else:
         templ = "Can't instantiate transport from {0!r}"
         raise ValueError(templ.format(uri))