new sensor manage code
diff --git a/__init__.py b/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/__init__.py
diff --git a/sensors/__init__.py b/sensors/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/sensors/__init__.py
diff --git a/sensors/api.py b/sensors/api.py
new file mode 100644
index 0000000..f78e6a9
--- /dev/null
+++ b/sensors/api.py
@@ -0,0 +1,47 @@
+import Queue
+import threading
+
+from contextlib import contextmanager
+
+from deploy_sensors import (deploy_and_start_sensors,
+                            stop_and_remove_sensors)
+from protocol import create_protocol, Timeout
+
+
+Empty = Queue.Empty
+
+
+def recv_main(proto, data_q, cmd_q):
+    while True:
+        try:
+            data_q.put(proto.recv(0.1))
+        except Timeout:
+            pass
+        try:
+            val = cmd_q.get(False)
+
+            if val is None:
+                return
+
+        except Queue.Empty:
+            pass
+
+
+@contextmanager
+def start_monitoring(uri, config):
+    deploy_and_start_sensors(uri, config)
+    try:
+        data_q = Queue.Queue()
+        cmd_q = Queue.Queue()
+        proto = create_protocol(uri, receiver=True)
+        th = threading.Thread(None, recv_main, None, (proto, data_q, cmd_q))
+        th.daemon = True
+        th.start()
+
+        try:
+            yield data_q
+        finally:
+            cmd_q.put(None)
+            th.join()
+    finally:
+        stop_and_remove_sensors(config)
diff --git a/sensors/config.yaml b/sensors/config.yaml
new file mode 100644
index 0000000..61f622b
--- /dev/null
+++ b/sensors/config.yaml
@@ -0,0 +1,23 @@
+127.0.0.1:
+    block-io:
+        allowed_prefixes: [sda1, rbd1]
+    net-io:
+        allowed_prefixes: [virbr2]
+
+# 192.168.152.39:
+#     block-io:
+#         allowed_prefixes: [sdb]
+#     net-io:
+#         allowed_prefixes: [eth0]
+
+# 192.168.152.40:
+#     block-io:
+#         allowed_prefixes: [sdb]
+#     net-io:
+#         allowed_prefixes: [eth0]
+
+# 192.168.152.41:
+#     block-io:
+#         allowed_prefixes: [sdb]
+#     net-io:
+#         allowed_prefixes: [eth0]
diff --git a/sensors/daemonize.py b/sensors/daemonize.py
new file mode 100644
index 0000000..1c3241b
--- /dev/null
+++ b/sensors/daemonize.py
@@ -0,0 +1,211 @@
+# #!/usr/bin/python
+
+import fcntl
+import os
+import pwd
+import grp
+import sys
+import signal
+import resource
+import logging
+import atexit
+from logging import handlers
+
+
+class Daemonize(object):
+    """ Daemonize object
+    Object constructor expects three arguments:
+    - app: contains the application name which will be sent to syslog.
+    - pid: path to the pidfile.
+    - action: your custom function which will be executed after daemonization.
+    - keep_fds: optional list of fds which should not be closed.
+    - auto_close_fds: optional parameter to not close opened fds.
+    - privileged_action: action that will be executed before drop privileges if user or
+                         group parameter is provided.
+                         If you want to transfer anything from privileged_action to action, such as
+                         opened privileged file descriptor, you should return it from
+                         privileged_action function and catch it inside action function.
+    - user: drop privileges to this user if provided.
+    - group: drop privileges to this group if provided.
+    - verbose: send debug messages to logger if provided.
+    - logger: use this logger object instead of creating new one, if provided.
+    """
+    def __init__(self, app, pid, action, keep_fds=None, auto_close_fds=True, privileged_action=None, user=None, group=None, verbose=False, logger=None):
+        self.app = app
+        self.pid = pid
+        self.action = action
+        self.keep_fds = keep_fds or []
+        self.privileged_action = privileged_action or (lambda: ())
+        self.user = user
+        self.group = group
+        self.logger = logger
+        self.verbose = verbose
+        self.auto_close_fds = auto_close_fds
+
+    def sigterm(self, signum, frame):
+        """ sigterm method
+        These actions will be done after SIGTERM.
+        """
+        self.logger.warn("Caught signal %s. Stopping daemon." % signum)
+        os.remove(self.pid)
+        sys.exit(0)
+
+    def exit(self):
+        """ exit method
+        Cleanup pid file at exit.
+        """
+        self.logger.warn("Stopping daemon.")
+        os.remove(self.pid)
+        sys.exit(0)
+
+    def start(self):
+        """ start method
+        Main daemonization process.
+        """
+        # If pidfile already exists, we should read pid from there; to overwrite it, if locking
+        # will fail, because locking attempt somehow purges the file contents.
+        if os.path.isfile(self.pid):
+            with open(self.pid, "r") as old_pidfile:
+                old_pid = old_pidfile.read()
+        # Create a lockfile so that only one instance of this daemon is running at any time.
+        try:
+            lockfile = open(self.pid, "w")
+        except IOError:
+            print("Unable to create the pidfile.")
+            sys.exit(1)
+        try:
+            # Try to get an exclusive lock on the file. This will fail if another process has the file
+            # locked.
+            fcntl.flock(lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
+        except IOError:
+            print("Unable to lock on the pidfile.")
+            # We need to overwrite the pidfile if we got here.
+            with open(self.pid, "w") as pidfile:
+                pidfile.write(old_pid)
+            sys.exit(1)
+
+        # Fork, creating a new process for the child.
+        process_id = os.fork()
+        if process_id < 0:
+            # Fork error. Exit badly.
+            sys.exit(1)
+        elif process_id != 0:
+            # This is the parent process. Exit.
+            sys.exit(0)
+        # This is the child process. Continue.
+
+        # Stop listening for signals that the parent process receives.
+        # This is done by getting a new process id.
+        # setpgrp() is an alternative to setsid().
+        # setsid puts the process in a new parent group and detaches its controlling terminal.
+        process_id = os.setsid()
+        if process_id == -1:
+            # Uh oh, there was a problem.
+            sys.exit(1)
+
+        # Add lockfile to self.keep_fds.
+        self.keep_fds.append(lockfile.fileno())
+
+        # Close all file descriptors, except the ones mentioned in self.keep_fds.
+        devnull = "/dev/null"
+        if hasattr(os, "devnull"):
+            # Python has set os.devnull on this system, use it instead as it might be different
+            # than /dev/null.
+            devnull = os.devnull
+
+        if self.auto_close_fds:
+            for fd in range(3, resource.getrlimit(resource.RLIMIT_NOFILE)[0]):
+                if fd not in self.keep_fds:
+                    try:
+                        os.close(fd)
+                    except OSError:
+                        pass
+
+        devnull_fd = os.open(devnull, os.O_RDWR)
+        os.dup2(devnull_fd, 0)
+        os.dup2(devnull_fd, 1)
+        os.dup2(devnull_fd, 2)
+
+        if self.logger is None:
+            # Initialize logging.
+            self.logger = logging.getLogger(self.app)
+            self.logger.setLevel(logging.DEBUG)
+            # Display log messages only on defined handlers.
+            self.logger.propagate = False
+
+            # Initialize syslog.
+            # It will correctly work on OS X, Linux and FreeBSD.
+            if sys.platform == "darwin":
+                syslog_address = "/var/run/syslog"
+            else:
+                syslog_address = "/dev/log"
+
+            # We will continue with syslog initialization only if actually have such capabilities
+            # on the machine we are running this.
+            if os.path.isfile(syslog_address):
+                syslog = handlers.SysLogHandler(syslog_address)
+                if self.verbose:
+                    syslog.setLevel(logging.DEBUG)
+                else:
+                    syslog.setLevel(logging.INFO)
+                # Try to mimic to normal syslog messages.
+                formatter = logging.Formatter("%(asctime)s %(name)s: %(message)s",
+                                              "%b %e %H:%M:%S")
+                syslog.setFormatter(formatter)
+
+                self.logger.addHandler(syslog)
+
+        # Set umask to default to safe file permissions when running as a root daemon. 027 is an
+        # octal number which we are typing as 0o27 for Python3 compatibility.
+        os.umask(0o27)
+
+        # Change to a known directory. If this isn't done, starting a daemon in a subdirectory that
+        # needs to be deleted results in "directory busy" errors.
+        os.chdir("/")
+
+        # Execute privileged action
+        privileged_action_result = self.privileged_action()
+        if not privileged_action_result:
+            privileged_action_result = []
+
+        # Change gid
+        if self.group:
+            try:
+                gid = grp.getgrnam(self.group).gr_gid
+            except KeyError:
+                self.logger.error("Group {0} not found".format(self.group))
+                sys.exit(1)
+            try:
+                os.setgid(gid)
+            except OSError:
+                self.logger.error("Unable to change gid.")
+                sys.exit(1)
+
+        # Change uid
+        if self.user:
+            try:
+                uid = pwd.getpwnam(self.user).pw_uid
+            except KeyError:
+                self.logger.error("User {0} not found.".format(self.user))
+                sys.exit(1)
+            try:
+                os.setuid(uid)
+            except OSError:
+                self.logger.error("Unable to change uid.")
+                sys.exit(1)
+
+        try:
+            lockfile.write("%s" % (os.getpid()))
+            lockfile.flush()
+        except IOError:
+            self.logger.error("Unable to write pid to the pidfile.")
+            print("Unable to write pid to the pidfile.")
+            sys.exit(1)
+
+        # Set custom action on SIGTERM.
+        signal.signal(signal.SIGTERM, self.sigterm)
+        atexit.register(self.exit)
+
+        self.logger.warn("Starting daemon.")
+
+        self.action(*privileged_action_result)
diff --git a/sensors/deploy_sensors.py b/sensors/deploy_sensors.py
new file mode 100644
index 0000000..b7e29b3
--- /dev/null
+++ b/sensors/deploy_sensors.py
@@ -0,0 +1,76 @@
+import time
+import json
+import os.path
+
+from disk_perf_test_tool.ssh_copy_directory import copy_paths
+from disk_perf_test_tool.ssh_runner import connect
+
+from concurrent.futures import ThreadPoolExecutor, wait
+
+
+def wait_all_ok(futures):
+    return all(future.result() for future in futures)
+
+
+def deploy_and_start_sensors(monitor_uri, config, remote_path='/tmp/sensors'):
+    paths = {os.path.dirname(__file__): remote_path}
+    with ThreadPoolExecutor(max_workers=32) as executor:
+        futures = []
+
+        for uri, config in config.items():
+            futures.append(executor.submit(deploy_and_start_sensor,
+                                           paths, uri, monitor_uri,
+                                           config, remote_path))
+
+        if not wait_all_ok(futures):
+            raise RuntimeError("Sensor deployment fails on some nodes")
+
+
+def deploy_and_start_sensor(paths, uri, monitor_uri, config, remote_path):
+    try:
+        conn = connect(uri)
+        copy_paths(conn, paths)
+        sftp = conn.open_sftp()
+
+        config_remote_path = os.path.join(remote_path, "conf.json")
+        main_remote_path = os.path.join(remote_path, "main.py")
+
+        with sftp.open(config_remote_path, "w") as fd:
+            fd.write(json.dumps(config))
+
+        cmd_templ = "python {0} -d start -u {1} {2}"
+        cmd = cmd_templ.format(main_remote_path,
+                               monitor_uri,
+                               config_remote_path)
+        conn.exec_command(cmd)
+        sftp.close()
+        conn.close()
+    except Exception as exc:
+        print exc
+        return False
+    return True
+
+
+def stop_and_remove_sensor(uri, remote_path):
+    conn = connect(uri)
+    main_remote_path = os.path.join(remote_path, "main.py")
+
+    cmd_templ = "python {0} -d stop"
+    conn.exec_command(cmd_templ.format(main_remote_path))
+    time.sleep(0.3)
+    # print out.read(), err.read()
+
+    conn.exec_command("rm -rf {0}".format(remote_path))
+
+    conn.close()
+
+
+def stop_and_remove_sensors(config, remote_path='/tmp/sensors'):
+    with ThreadPoolExecutor(max_workers=32) as executor:
+        futures = []
+
+        for uri, config in config.items():
+            futures.append(executor.submit(stop_and_remove_sensor,
+                                           uri, remote_path))
+
+        wait(futures)
diff --git a/sensors/discover.py b/sensors/discover.py
new file mode 100644
index 0000000..f227043
--- /dev/null
+++ b/sensors/discover.py
@@ -0,0 +1,9 @@
+all_sensors = {}
+
+
+def provides(sensor_class_name):
+    def closure(func):
+        assert sensor_class_name not in all_sensors
+        all_sensors[sensor_class_name] = func
+        return func
+    return closure
diff --git a/sensors/grafana_template.js b/sensors/grafana_template.js
new file mode 100644
index 0000000..08f8f99
--- /dev/null
+++ b/sensors/grafana_template.js
@@ -0,0 +1,80 @@
+/* global _ */
+
+/*
+ * Complex scripted dashboard
+ * This script generates a dashboard object that Grafana can load. It also takes a number of user
+ * supplied URL parameters (int ARGS variable)
+ *
+ * Return a dashboard object, or a function
+ *
+ * For async scripts, return a function, this function must take a single callback function as argument,
+ * call this callback function with the dashboard object (look at scripted_async.js for an example)
+ */
+
+
+
+// accessable variables in this scope
+var window, document, ARGS, $, jQuery, moment, kbn;
+
+// Setup some variables
+var dashboard;
+
+// All url parameters are available via the ARGS object
+var ARGS;
+
+// Intialize a skeleton with nothing but a rows array and service object
+dashboard = {
+  rows : [],
+};
+
+// Set a title
+dashboard.title = 'Tests dash';
+
+// Set default time
+// time can be overriden in the url using from/to parameteres, but this is
+// handled automatically in grafana core during dashboard initialization
+dashboard.time = {
+  from: "now-5m",
+  to: "now"
+};
+
+var rows = 1;
+var seriesName = 'argName';
+
+if(!_.isUndefined(ARGS.rows)) {
+  rows = parseInt(ARGS.rows, 10);
+}
+
+if(!_.isUndefined(ARGS.name)) {
+  seriesName = ARGS.name;
+}
+
+for (var i = 0; i < rows; i++) {
+
+  dashboard.rows.push({
+    title: 'Chart',
+    height: '300px',
+    panels: [
+      {
+        title: 'Events',
+        type: 'graph',
+        span: 12,
+        fill: 1,
+        linewidth: 2,
+        targets: [
+              {"target": "disk io",
+               "query": "select value from \"sectors_written\" where $timeFilter and host='192.168.0.104' and device='sda1' order asc",
+               "interval": "",
+               "alias": "host io sw",
+               "rawQuery": true}       
+        ],
+        tooltip: {
+          shared: true
+        }
+      }
+    ]
+  });
+}
+
+
+return dashboard;
diff --git a/sensors/host1_config.json b/sensors/host1_config.json
new file mode 100644
index 0000000..7e81125
--- /dev/null
+++ b/sensors/host1_config.json
@@ -0,0 +1,8 @@
+{
+	"block-io": {
+		"allowed_prefixes": ["sdb1"]
+	},
+	"net-io": {
+		"allowed_prefixes": ["eth0"]
+	}
+}
\ No newline at end of file
diff --git a/sensors/influx_exporter.py b/sensors/influx_exporter.py
new file mode 100644
index 0000000..34b3c0a
--- /dev/null
+++ b/sensors/influx_exporter.py
@@ -0,0 +1,31 @@
+from urlparse import urlparse
+from influxdb import InfluxDBClient
+
+
+def connect(url):
+    parsed_url = urlparse(url)
+    user_passwd, host_port = parsed_url.netloc.rsplit("@", 1)
+    user, passwd = user_passwd.split(":", 1)
+    host, port = host_port.split(":")
+    return InfluxDBClient(host, int(port), user, passwd, parsed_url.path[1:])
+
+
+def add_data(conn, hostname, data):
+    per_sensor_data = {}
+    for serie in data:
+        serie = serie.copy()
+        gtime = serie.pop('time')
+        for key, val in serie.items():
+            dev, sensor = key.split('.')
+            data = per_sensor_data.setdefault(sensor, [])
+            data.append([gtime, hostname, dev, val])
+
+    infl_data = []
+    columns = ['time', 'host', 'device', 'value']
+    for sensor_name, points in per_sensor_data.items():
+        infl_data.append(
+            {'columns': columns,
+             'name': sensor_name,
+             'points': points})
+
+    conn.write_points(infl_data)
diff --git a/sensors/io_sensors.py b/sensors/io_sensors.py
new file mode 100644
index 0000000..2fdd24e
--- /dev/null
+++ b/sensors/io_sensors.py
@@ -0,0 +1,46 @@
+from discover import provides
+from utils import SensorInfo, is_dev_accepted
+
+#  1 - major number
+#  2 - minor mumber
+#  3 - device name
+#  4 - reads completed successfully
+#  5 - reads merged
+#  6 - sectors read
+#  7 - time spent reading (ms)
+#  8 - writes completed
+#  9 - writes merged
+# 10 - sectors written
+# 11 - time spent writing (ms)
+# 12 - I/Os currently in progress
+# 13 - time spent doing I/Os (ms)
+# 14 - weighted time spent doing I/Os (ms)
+
+io_values_pos = [
+    (3, 'reads_completed', True),
+    (5, 'sectors_read', True),
+    (6, 'rtime', True),
+    (7, 'writes_completed', True),
+    (9, 'sectors_written', True),
+    (10, 'wtime', True),
+    (11, 'io_queue', False),
+    (13, 'io_time', True)
+]
+
+
+@provides("block-io")
+def io_stat(disallowed_prefixes=('ram', 'loop'), allowed_prefixes=None):
+    results = {}
+    for line in open('/proc/diskstats'):
+        vals = line.split()
+        dev_name = vals[2]
+
+        dev_ok = is_dev_accepted(dev_name,
+                                 disallowed_prefixes,
+                                 allowed_prefixes)
+
+        if dev_ok:
+            for pos, name, accum_val in io_values_pos:
+                sensor_name = "{0}.{1}".format(dev_name, name)
+                results[sensor_name] = SensorInfo(int(vals[pos]), accum_val)
+    return results
diff --git a/sensors/main.py b/sensors/main.py
new file mode 100644
index 0000000..07a1dce
--- /dev/null
+++ b/sensors/main.py
@@ -0,0 +1,103 @@
+import os
+import sys
+import time
+import json
+import signal
+import os.path
+import argparse
+
+import io_sensors
+import net_sensors
+
+from utils import SensorInfo
+from daemonize import Daemonize
+from discover import all_sensors
+from protocol import create_protocol
+
+
+def get_values(required_sensors):
+    result = {}
+    for sensor_name, params in required_sensors:
+        if sensor_name in all_sensors:
+            result.update(all_sensors[sensor_name](**params))
+        else:
+            msg = "Sensor {0!r} isn't available".format(sensor_name)
+            raise ValueError(msg)
+    return time.time(), result
+
+
+def parse_args(args):
+    parser = argparse.ArgumentParser()
+    parser.add_argument('-d', '--daemon',
+                        choices=('start', 'stop', 'status'),
+                        default=None)
+
+    parser.add_argument('-u', '--url', default='stdout://')
+    parser.add_argument('-t', '--timeout', type=float, default=1)
+    parser.add_argument('sensors_config', type=argparse.FileType('r'),
+                        default=None, nargs='?')
+    return parser.parse_args(args[1:])
+
+
+def daemon_main(required_sensors, opts):
+    sender = create_protocol(opts.url)
+    prev = {}
+
+    while True:
+        gtime, data = get_values(required_sensors.items())
+        curr = {'time': SensorInfo(gtime, True)}
+        for name, val in data.items():
+            if val.is_accumulated:
+                if name in prev:
+                    curr[name] = SensorInfo(val.value - prev[name], True)
+                prev[name] = val.value
+            else:
+                curr[name] = SensorInfo(val.value, False)
+        sender.send(curr)
+        time.sleep(opts.timeout)
+
+
+def main(argv):
+    opts = parse_args(argv)
+
+    if opts.daemon is not None:
+        pid_file = "/tmp/sensors.pid"
+        if opts.daemon == 'start':
+            required_sensors = json.loads(opts.sensors_config.read())
+
+            def root_func():
+                daemon_main(required_sensors, opts)
+
+            daemon = Daemonize(app="perfcollect_app",
+                               pid=pid_file,
+                               action=root_func)
+            daemon.start()
+        elif opts.daemon == 'stop':
+            if os.path.isfile(pid_file):
+                pid = int(open(pid_file).read())
+                if os.path.exists("/proc/" + str(pid)):
+                    os.kill(pid, signal.SIGTERM)
+
+                time.sleep(0.1)
+
+                if os.path.exists("/proc/" + str(pid)):
+                    os.kill(pid, signal.SIGKILL)
+
+                if os.path.isfile(pid_file):
+                    os.unlink(pid_file)
+        elif opts.daemon == 'status':
+            if os.path.isfile(pid_file):
+                pid = int(open(pid_file).read())
+                if os.path.exists("/proc/" + str(pid)):
+                    print "running"
+                    return
+            print "stopped"
+        else:
+            raise ValueError("Unknown daemon operation {}".format(opts.daemon))
+    else:
+        required_sensors = json.loads(opts.sensors_config.read())
+        daemon_main(required_sensors, opts)
+    return 0
+
+if __name__ == "__main__":
+    exit(main(sys.argv))
diff --git a/sensors/net_sensors.py b/sensors/net_sensors.py
new file mode 100644
index 0000000..3a2d926
--- /dev/null
+++ b/sensors/net_sensors.py
@@ -0,0 +1,43 @@
+from discover import provides
+from utils import SensorInfo, is_dev_accepted
+
+#  1 - major number
+#  2 - minor mumber
+#  3 - device name
+#  4 - reads completed successfully
+#  5 - reads merged
+#  6 - sectors read
+#  7 - time spent reading (ms)
+#  8 - writes completed
+#  9 - writes merged
+# 10 - sectors written
+# 11 - time spent writing (ms)
+# 12 - I/Os currently in progress
+# 13 - time spent doing I/Os (ms)
+# 14 - weighted time spent doing I/Os (ms)
+
+net_values_pos = [
+    (0, 'recv_bytes', True),
+    (1, 'recv_packets', True),
+    (8, 'send_bytes', True),
+    (9, 'send_packets', True),
+]
+
+
+@provides("net-io")
+def net_stat(disallowed_prefixes=('docker',), allowed_prefixes=None):
+    results = {}
+
+    for line in open('/proc/net/dev').readlines()[2:]:
+        dev_name, stats = line.split(":", 1)
+        dev_name = dev_name.strip()
+        vals = stats.split()
+
+        dev_ok = is_dev_accepted(dev_name,
+                                 disallowed_prefixes,
+                                 allowed_prefixes)
+        if dev_ok:
+            for pos, name, accum_val in net_values_pos:
+                sensor_name = "{0}.{1}".format(dev_name, name)
+                results[sensor_name] = SensorInfo(int(vals[pos]), accum_val)
+    return results
diff --git a/sensors/protocol.py b/sensors/protocol.py
new file mode 100644
index 0000000..fa88927
--- /dev/null
+++ b/sensors/protocol.py
@@ -0,0 +1,120 @@
+import socket
+import select
+import cPickle as pickle
+from urlparse import urlparse
+
+
+class Timeout(Exception):
+    pass
+
+
+# ------------------------------------- Serializers --------------------------
+
+
+class ISensortResultsSerializer(object):
+    def pack(self, data):
+        pass
+
+    def unpack(self, data):
+        pass
+
+
+class PickleSerializer(ISensortResultsSerializer):
+    def pack(self, data):
+        ndata = {key: val.value for key, val in data.items()}
+        return pickle.dumps(ndata)
+
+    def unpack(self, data):
+        return pickle.loads(data)
+
+
+# ------------------------------------- Transports ---------------------------
+
+class ITransport(object):
+    def __init__(self, receiver):
+        pass
+
+    def send(self, data):
+        pass
+
+    def recv(self, timeout=None):
+        pass
+
+
+class StdoutTransport(ITransport):
+    MIN_COL_WIDTH = 10
+
+    def __init__(self, receiver, delta=True):
+        if receiver:
+            raise ValueError("StdoutTransport don't allows receiving")
+
+        self.headers = None
+        self.line_format = ""
+        self.prev = {}
+        self.delta = delta
+
+    def send(self, data):
+        if self.headers is None:
+            self.headers = sorted(data)
+
+            for pos, header in enumerate(self.headers):
+                self.line_format += "{%s:>%s}" % (pos,
+                                                  max(len(header) + 1,
+                                                      self.MIN_COL_WIDTH))
+
+            print self.line_format.format(*self.headers)
+
+        if self.delta:
+            vals = [data[header].value - self.prev.get(header, 0)
+                    for header in self.headers]
+
+            self.prev.update({header: data[header].value
+                              for header in self.headers})
+        else:
+            vals = [data[header].value for header in self.headers]
+
+        print self.line_format.format(*vals)
+
+    def recv(self, timeout=None):
+        raise ValueError("StdoutTransport don't allows receiving")
+
+
+class UDPTransport(ITransport):
+    def __init__(self, receiver, ip, port, packer_cls):
+        self.port = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+        if receiver:
+            self.port.bind((ip, port))
+            self.packer_cls = packer_cls
+            self.packers = {}
+        else:
+            self.packer = packer_cls()
+            self.dst = (ip, port)
+
+    def send(self, data):
+        raw_data = self.packer.pack(data)
+        self.port.sendto(raw_data, self.dst)
+
+    def recv(self, timeout=None):
+        r, _, _ = select.select([self.port], [], [], timeout)
+        if len(r) != 0:
+            raw_data, addr = self.port.recvfrom(10000)
+            packer = self.packers.setdefault(addr, self.packer_cls())
+            return addr, packer.unpack(raw_data)
+        else:
+            raise Timeout()
+
+
+# -------------------------- Factory function --------------------------------
+
+
+def create_protocol(uri, receiver=False):
+    parsed_uri = urlparse(uri)
+    if parsed_uri.scheme == 'stdout':
+        return StdoutTransport(receiver)
+    elif parsed_uri.scheme == 'udp':
+        ip, port = parsed_uri.netloc.split(":")
+        return UDPTransport(receiver, ip=ip, port=int(port),
+                            packer_cls=PickleSerializer)
+    else:
+        templ = "Can't instantiate transport from {0!r}"
+        raise ValueError(templ.format(uri))
diff --git a/sensors/receiver.py b/sensors/receiver.py
new file mode 100644
index 0000000..369a65f
--- /dev/null
+++ b/sensors/receiver.py
@@ -0,0 +1,43 @@
+import yaml
+
+from api import start_monitoring, Empty
+from influx_exporter import connect, add_data
+
+monitor_config = yaml.load(open("config.yaml").read())
+
+uri = "udp://192.168.0.104:12001"
+infldb_url = "influxdb://perf:perf@192.168.152.42:8086/perf"
+conn = connect(infldb_url)
+
+sw_per_ip = {}
+count = 4
+expected = ['192.168.0.104', '192.168.152.41',
+            '192.168.152.39', '192.168.152.40']
+
+with start_monitoring(uri, monitor_config) as queue:
+    while True:
+        try:
+            (ip, port), data = queue.get(True, 1)
+
+            if 'sda1.sectors_written' in data:
+                val = data['sda1.sectors_written']
+            elif 'sdb.sectors_written' in data:
+                val = data['sdb.sectors_written']
+            else:
+                val = 0
+
+            sw_per_ip[ip] = sw_per_ip.get(ip, 0) + val
+            count -= 1
+
+            if 0 == count:
+                try:
+                    vals = [sw_per_ip[ip] for ip in expected]
+                    print ("{:>6}" * 4).format(*vals)
+                    sw_per_ip = {}
+                    count = 4
+                except KeyError:
+                    pass
+
+            add_data(conn, ip, [data])
+        except Empty:
+            pass
diff --git a/sensors/utils.py b/sensors/utils.py
new file mode 100644
index 0000000..5af0a2a
--- /dev/null
+++ b/sensors/utils.py
@@ -0,0 +1,44 @@
+from collections import namedtuple
+
+SensorInfo = namedtuple("SensorInfo", ['value', 'is_accumulated'])
+
+
+def is_dev_accepted(name, disallowed_prefixes, allowed_prefixes):
+    dev_ok = True
+
+    if disallowed_prefixes is not None:
+        dev_ok = all(not name.startswith(prefix)
+                     for prefix in disallowed_prefixes)
+
+    if dev_ok and allowed_prefixes is not None:
+        dev_ok = any(name.startswith(prefix)
+                     for prefix in allowed_prefixes)
+
+    return dev_ok
+
+
+def delta(func, only_upd=True):
+    prev = {}
+    while True:
+        for dev_name, vals in func():
+            if dev_name not in prev:
+                prev[dev_name] = {}
+                for name, (val, _) in vals.items():
+                    prev[dev_name][name] = val
+            else:
+                dev_prev = prev[dev_name]
+                res = {}
+                for stat_name, (val, accum_val) in vals.items():
+                    if accum_val:
+                        if stat_name in dev_prev:
+                            delta = int(val) - int(dev_prev[stat_name])
+                            if not only_upd or 0 != delta:
+                                res[stat_name] = str(delta)
+                        dev_prev[stat_name] = val
+                    elif not only_upd or '0' != val:
+                        res[stat_name] = val
+
+                if only_upd and len(res) == 0:
+                    continue
+                yield dev_name, res
+        yield None, None
diff --git a/ssh_runner.py b/ssh_runner.py
index 5cb26de..28996ab 100644
--- a/ssh_runner.py
+++ b/ssh_runner.py
@@ -55,8 +55,8 @@
 
 
 def parse_ssh_uri(uri):
-    # user:passwd@ip_host:port
-    # user:passwd@ip_host
+    # user:passwd@@ip_host:port
+    # user:passwd@@ip_host
     # user@ip_host:port
     # user@ip_host
     # ip_host:port
@@ -68,6 +68,8 @@
 
     res = ConnCreds()
     res.port = "22"
+    res.key_file = None
+    res.passwd = None
 
     for rr in uri_reg_exprs:
         rrm = re.match(rr, uri)
diff --git a/storage_api.py b/storage_api.py
index 28ec060..1dbf30c 100644
--- a/storage_api.py
+++ b/storage_api.py
@@ -6,6 +6,7 @@
 from flask import url_for
 import os
 
+
 class Measurement(object):
     def __init__(self):
         self.build = ""
@@ -27,7 +28,7 @@
             s = stdev(item[1])
             build[item[0]] = [m, s]
 
-            
+
 def mean(l):
     n = len(l)
 
diff --git a/utils.py b/utils.py
index 93c82e0..ca65409 100644
--- a/utils.py
+++ b/utils.py
@@ -1,5 +1,7 @@
 import time
 import socket
+import os.path
+import getpass
 import logging
 import threading
 import contextlib
@@ -39,9 +41,14 @@
     ssh.known_hosts = None
     for i in range(retry_count):
         try:
+            if creds.user is None:
+                user = getpass.getuser()
+            else:
+                user = creds.user
+
             if creds.passwd is not None:
                 ssh.connect(creds.host,
-                            username=creds.user,
+                            username=user,
                             password=creds.passwd,
                             port=creds.port,
                             allow_agent=False,
@@ -50,12 +57,20 @@
 
             if creds.key_file is not None:
                 ssh.connect(creds.host,
-                            username=creds.user,
+                            username=user,
                             key_filename=creds.key_file,
                             look_for_keys=False,
                             port=creds.port)
                 return ssh
-            raise ValueError("Wrong credentials {0}".format(creds.__dict__))
+
+            key_file = os.path.expanduser('~/.ssh/id_rsa')
+            ssh.connect(creds.host,
+                        username=user,
+                        key_filename=key_file,
+                        look_for_keys=False,
+                        port=creds.port)
+            return ssh
+            # raise ValueError("Wrong credentials {0}".format(creds.__dict__))
         except paramiko.PasswordRequiredException:
             raise
         except socket.error: