very large refactoring
diff --git a/wally/sensors/__init__.py b/wally/sensors/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/wally/sensors/__init__.py
diff --git a/wally/sensors/api.py b/wally/sensors/api.py
new file mode 100644
index 0000000..f66bb36
--- /dev/null
+++ b/wally/sensors/api.py
@@ -0,0 +1,58 @@
+import Queue
+import threading
+from contextlib import contextmanager
+
+from .deploy_sensors import (deploy_and_start_sensors,
+                             stop_and_remove_sensors)
+from .protocol import create_protocol, Timeout
+
+
+__all__ = ['Empty', 'recv_main', 'start_monitoring',
+           'deploy_and_start_sensors', 'SensorConfig']
+
+
+Empty = Queue.Empty
+
+
+class SensorConfig(object):
+    def __init__(self, conn, url, sensors):
+        self.conn = conn
+        self.url = url
+        self.sensors = sensors
+
+
+def recv_main(proto, data_q, cmd_q):
+    while True:
+        try:
+            data_q.put(proto.recv(0.1))
+        except Timeout:
+            pass
+
+        try:
+            val = cmd_q.get(False)
+
+            if val is None:
+                return
+
+        except Queue.Empty:
+            pass
+
+
+@contextmanager
+def start_monitoring(uri, configs):
+    deploy_and_start_sensors(uri, configs)
+    try:
+        data_q = Queue.Queue()
+        cmd_q = Queue.Queue()
+        proto = create_protocol(uri, receiver=True)
+        th = threading.Thread(None, recv_main, None, (proto, data_q, cmd_q))
+        th.daemon = True
+        th.start()
+
+        try:
+            yield data_q
+        finally:
+            cmd_q.put(None)
+            th.join()
+    finally:
+        stop_and_remove_sensors(configs)
diff --git a/wally/sensors/cp_protocol.py b/wally/sensors/cp_protocol.py
new file mode 100644
index 0000000..4e96afe
--- /dev/null
+++ b/wally/sensors/cp_protocol.py
@@ -0,0 +1,227 @@
+#!/usr/bin/env python
+""" Protocol class """
+
+import re
+import zlib
+import json
+import logging
+import binascii
+
+
+logger = logging.getLogger("wally")
+
+
+# protocol contains 2 type of packet:
+# 1 - header, which contains template schema of counters
+# 2 - body, which contains only values in order as in template
+#       it uses msgpack (or provided packer) for optimization
+#
+# packet has format:
+# begin_data_prefixSIZE\n\nDATAend_data_postfix
+# packet part has format:
+# SIZE\n\rDATA
+#
+# DATA use archivation
+
+
+class PacketException(Exception):
+    """ Exceptions from Packet"""
+    pass
+
+
+class Packet(object):
+    """ Class proceed packet by protocol"""
+
+    prefix = "begin_data_prefix"
+    postfix = "end_data_postfix"
+    header_prefix = "template"
+    # other fields
+    # is_begin
+    # is_end
+    # crc
+    # data
+    # data_len
+
+    def __init__(self, packer):
+        # preinit
+        self.is_begin = False
+        self.is_end = False
+        self.crc = None
+        self.data = ""
+        self.data_len = None
+        self.srv_template = None
+        self.clt_template = None
+        self.tmpl_size = 0
+        self.packer = packer
+
+    def new_packet(self, part):
+        """ New packet adding """
+        # proceed packet
+        try:
+            # get size
+            local_size_s, _, part = part.partition("\n\r")
+            local_size = int(local_size_s)
+
+            # find prefix
+            begin = part.find(self.prefix)
+            if begin != -1:
+                # divide data if something before begin and prefix
+                from_i = begin + len(self.prefix)
+                part = part[from_i:]
+                # reset flags
+                self.is_begin = True
+                self.is_end = False
+                self.data = ""
+                # get size
+                data_len_s, _, part = part.partition("\n\r")
+                self.data_len = int(data_len_s)
+                # get crc
+                crc_s, _, part = part.partition("\n\r")
+                self.crc = int(crc_s)
+
+            # bad size?
+            if local_size != self.data_len:
+                raise PacketException("Part size error")
+
+            # find postfix
+            end = part.find(self.postfix)
+            if end != -1:
+                # divide postfix
+                part = part[:end]
+                self.is_end = True
+
+            self.data += part
+            # check if it is end
+            if self.is_end:
+                self.data = zlib.decompress(self.data)
+                if self.data_len != len(self.data):
+                    raise PacketException("Total size error")
+                if binascii.crc32(self.data) != self.crc:
+                    raise PacketException("CRC error")
+
+                # check, if it is template
+                if self.data.startswith(self.header_prefix):
+                    self.srv_template = self.data
+                    # template is for internal use
+                    return None
+
+                # decode values list
+                vals = self.packer.unpack(self.data)
+                dump = self.srv_template % tuple(vals)
+                return dump
+            else:
+                return None
+
+        except PacketException as e:
+            # if something wrong - skip packet
+            logger.warning("Packet skipped: %s", e)
+            self.is_begin = False
+            self.is_end = False
+            return None
+
+        except TypeError:
+            # if something wrong - skip packet
+            logger.warning("Packet skipped: doesn't match schema")
+            self.is_begin = False
+            self.is_end = False
+            return None
+
+        except:
+            # if something at all wrong - skip packet
+            logger.warning("Packet skipped: something is wrong")
+            self.is_begin = False
+            self.is_end = False
+            return None
+
+    @staticmethod
+    def create_packet(data, part_size):
+        """ Create packet divided by parts with part_size from data
+            No compression here """
+        # prepare data
+        data_len = "%i\n\r" % len(data)
+        header = "%s%s%s\n\r" % (Packet.prefix, data_len, binascii.crc32(data))
+        compact_data = zlib.compress(data)
+        packet = "%s%s%s" % (header, compact_data, Packet.postfix)
+
+        partheader_len = len(data_len)
+
+        beg = 0
+        end = part_size - partheader_len
+
+        result = []
+        while beg < len(packet):
+            block = packet[beg:beg+end]
+            result.append(data_len + block)
+            beg += end
+
+        return result
+
+    def create_packet_v2(self, data, part_size):
+        """ Create packet divided by parts with part_size from data
+            Compressed """
+        result = []
+        # create and add to result template header
+        if self.srv_template is None:
+            perf_string = json.dumps(data)
+            self.create_answer_template(perf_string)
+            template = self.header_prefix + self.srv_template
+            header = Packet.create_packet(template, part_size)
+            result.extend(header)
+
+        vals = self.get_matching_value_list(data)
+        body = self.packer.pack(vals)
+        parts = Packet.create_packet(body, part_size)
+        result.extend(parts)
+        return result
+
+    def get_matching_value_list(self, data):
+        """ Get values in order server expect"""
+        vals = range(0, self.tmpl_size)
+
+        try:
+            for node, groups in self.clt_template.items():
+                for group, counters in groups.items():
+                    for counter, index in counters.items():
+                        if not isinstance(index, dict):
+                            vals[index] = data[node][group][counter]
+                        else:
+                            for k, i in index.items():
+                                vals[i] = data[node][group][counter][k]
+
+            return vals
+
+        except (IndexError, KeyError):
+            logger = logging.getLogger(__name__)
+            logger.error("Data don't match last schema")
+            raise PacketException("Data don't match last schema")
+
+    def create_answer_template(self, perf_string):
+        """ Create template for server to insert counter values
+            Return tuple of server and clien templates + number of replaces"""
+        replacer = re.compile(": [0-9]+\.?[0-9]*")
+        # replace all values by %s
+        finditer = replacer.finditer(perf_string)
+        # server not need know positions
+        self.srv_template = ""
+        # client need positions
+        clt_template = ""
+        beg = 0
+        k = 0
+        # this could be done better?
+        for match in finditer:
+            # define input place in server template
+            self.srv_template += perf_string[beg:match.start()]
+            self.srv_template += ": %s"
+            # define match number in client template
+            clt_template += perf_string[beg:match.start()]
+            clt_template += ": %i" % k
+
+            beg = match.end()
+            k += 1
+
+        # add tail
+        self.srv_template += perf_string[beg:]
+        clt_template += perf_string[beg:]
+
+        self.tmpl_size = k
+        self.clt_template = json.loads(clt_template)
diff --git a/wally/sensors/cp_transport.py b/wally/sensors/cp_transport.py
new file mode 100644
index 0000000..2e00e80
--- /dev/null
+++ b/wally/sensors/cp_transport.py
@@ -0,0 +1,157 @@
+#!/usr/bin/env python
+""" UDP sender class """
+
+import socket
+import urlparse
+
+from cp_protocol import Packet
+
+try:
+    from disk_perf_test_tool.logger import define_logger
+    logger = define_logger(__name__)
+except ImportError:
+    class Logger(object):
+        def debug(self, *dt):
+            pass
+    logger = Logger()
+
+
+class SenderException(Exception):
+    """ Exceptions in Sender class """
+    pass
+
+
+class Timeout(Exception):
+    """ Exceptions in Sender class """
+    pass
+
+
+class Sender(object):
+    """ UDP sender class """
+
+    def __init__(self, packer, url=None, port=None, host="0.0.0.0", size=256):
+        """ Create connection object from input udp string or params"""
+
+        # test input
+        if url is None and port is None:
+            raise SenderException("Bad initialization")
+        if url is not None:
+            data = urlparse.urlparse(url)
+            # check schema
+            if data.scheme != "udp":
+                mes = "Bad protocol type: %s instead of UDP" % data.scheme
+                logger.error(mes)
+                raise SenderException("Bad protocol type")
+            # try to get port
+            try:
+                int_port = int(data.port)
+            except ValueError:
+                logger.error("Bad UDP port")
+                raise SenderException("Bad UDP port")
+            # save paths
+            self.sendto = (data.hostname, int_port)
+            self.bindto = (data.hostname, int_port)
+            # try to get size
+            try:
+                self.size = int(data.path.strip("/"))
+            except ValueError:
+                logger.error("Bad packet part size")
+                raise SenderException("Bad packet part size")
+        else:
+            # url is None - use size and port
+            self.sendto = (host, port)
+            self.bindto = ("0.0.0.0", port)
+            self.size = size
+
+        self.packer = packer
+
+        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+        self.binded = False
+        self.all_data = {}
+        self.send_packer = None
+
+    def bind(self):
+        """ Prepare for listening """
+        self.sock.bind(self.bindto)
+        self.sock.settimeout(0.5)
+        self.binded = True
+
+    def send(self, data):
+        """ Send data to udp socket"""
+        if self.sock.sendto(data, self.sendto) != len(data):
+            mes = "Cannot send data to %s:%s" % self.sendto
+            logger.error(mes)
+            raise SenderException("Cannot send data")
+
+    def send_by_protocol(self, data):
+        """ Send data by Packet protocol
+            data = dict"""
+        if self.send_packer is None:
+            self.send_packer = Packet(self.packer())
+        parts = self.send_packer.create_packet_v2(data, self.size)
+        for part in parts:
+            self.send(part)
+
+    def recv(self):
+        """ Receive data from udp socket"""
+        # check for binding
+        if not self.binded:
+            self.bind()
+        # try to recv
+        try:
+            data, (remote_ip, _) = self.sock.recvfrom(self.size)
+            return data, remote_ip
+        except socket.timeout:
+            raise Timeout()
+
+    def recv_by_protocol(self):
+        """ Receive data from udp socket by Packet protocol"""
+        data, remote_ip = self.recv()
+
+        if remote_ip not in self.all_data:
+            self.all_data[remote_ip] = Packet(self.packer())
+
+        return self.all_data[remote_ip].new_packet(data)
+
+    def recv_with_answer(self, stop_event=None):
+        """ Receive data from udp socket and send 'ok' back
+            Command port = local port + 1
+            Answer port = local port
+            Waiting for command is blocking """
+        # create command socket
+        command_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+        command_port = self.bindto[1]+1
+        command_sock.bind(("0.0.0.0", command_port))
+        command_sock.settimeout(1)
+        # try to recv
+        while True:
+            try:
+                data, (remote_ip, _) = command_sock.recvfrom(self.size)
+                self.send("ok")
+                return data, remote_ip
+            except socket.timeout:
+                if stop_event is not None and stop_event.is_set():
+                    # return None if we are interrupted
+                    return None
+
+    def verified_send(self, send_host, message, max_repeat=20):
+        """ Send and verify it by answer not more then max_repeat
+            Send port = local port + 1
+            Answer port = local port
+            Return True if send is verified """
+        # create send socket
+        send_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+        send_port = self.sendto[1]+1
+        for repeat in range(0, max_repeat):
+            send_sock.sendto(message, (send_host, send_port))
+            try:
+                data, remote_ip = self.recv()
+                if remote_ip == send_host and data == "ok":
+                    return True
+                else:
+                    logger.warning("No answer from %s, try %i",
+                                   send_host, repeat)
+            except Timeout:
+                logger.warning("No answer from %s, try %i", send_host, repeat)
+
+        return False
diff --git a/wally/sensors/daemonize.py b/wally/sensors/daemonize.py
new file mode 100644
index 0000000..a4fa157
--- /dev/null
+++ b/wally/sensors/daemonize.py
@@ -0,0 +1,226 @@
+# #!/usr/bin/python
+
+import fcntl
+import os
+import pwd
+import grp
+import sys
+import signal
+import resource
+import logging
+import atexit
+from logging import handlers
+
+
+class Daemonize(object):
+    """ Daemonize object
+    Object constructor expects three arguments:
+    - app: contains the application name which will be sent to syslog.
+    - pid: path to the pidfile.
+    - action: your custom function which will be executed after daemonization.
+    - keep_fds: optional list of fds which should not be closed.
+    - auto_close_fds: optional parameter to not close opened fds.
+    - privileged_action: action that will be executed before
+                         drop privileges if user or
+                         group parameter is provided.
+                         If you want to transfer anything from privileged
+                         action to action, such as opened privileged file
+                         descriptor, you should return it from
+                         privileged_action function and catch it inside action
+                         function.
+    - user: drop privileges to this user if provided.
+    - group: drop privileges to this group if provided.
+    - verbose: send debug messages to logger if provided.
+    - logger: use this logger object instead of creating new one, if provided.
+    """
+    def __init__(self, app, pid, action, keep_fds=None, auto_close_fds=True,
+                 privileged_action=None, user=None, group=None, verbose=False,
+                 logger=None):
+        self.app = app
+        self.pid = pid
+        self.action = action
+        self.keep_fds = keep_fds or []
+        self.privileged_action = privileged_action or (lambda: ())
+        self.user = user
+        self.group = group
+        self.logger = logger
+        self.verbose = verbose
+        self.auto_close_fds = auto_close_fds
+
+    def sigterm(self, signum, frame):
+        """ sigterm method
+        These actions will be done after SIGTERM.
+        """
+        self.logger.warn("Caught signal %s. Stopping daemon." % signum)
+        os.remove(self.pid)
+        sys.exit(0)
+
+    def exit(self):
+        """ exit method
+        Cleanup pid file at exit.
+        """
+        self.logger.warn("Stopping daemon.")
+        os.remove(self.pid)
+        sys.exit(0)
+
+    def start(self):
+        """ start method
+        Main daemonization process.
+        """
+        # If pidfile already exists, we should read pid from there;
+        # to overwrite it, if locking
+        # will fail, because locking attempt somehow purges the file contents.
+        if os.path.isfile(self.pid):
+            with open(self.pid, "r") as old_pidfile:
+                old_pid = old_pidfile.read()
+        # Create a lockfile so that only one instance of this daemon is
+        # running at any time.
+        try:
+            lockfile = open(self.pid, "w")
+        except IOError:
+            print("Unable to create the pidfile.")
+            sys.exit(1)
+        try:
+            # Try to get an exclusive lock on the file. This will fail if
+            # another process has the file
+            # locked.
+            fcntl.flock(lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
+        except IOError:
+            print("Unable to lock on the pidfile.")
+            # We need to overwrite the pidfile if we got here.
+            with open(self.pid, "w") as pidfile:
+                pidfile.write(old_pid)
+            sys.exit(1)
+
+        # Fork, creating a new process for the child.
+        process_id = os.fork()
+        if process_id < 0:
+            # Fork error. Exit badly.
+            sys.exit(1)
+        elif process_id != 0:
+            # This is the parent process. Exit.
+            sys.exit(0)
+        # This is the child process. Continue.
+
+        # Stop listening for signals that the parent process receives.
+        # This is done by getting a new process id.
+        # setpgrp() is an alternative to setsid().
+        # setsid puts the process in a new parent group and detaches
+        # its controlling terminal.
+        process_id = os.setsid()
+        if process_id == -1:
+            # Uh oh, there was a problem.
+            sys.exit(1)
+
+        # Add lockfile to self.keep_fds.
+        self.keep_fds.append(lockfile.fileno())
+
+        # Close all file descriptors, except the ones mentioned in
+        # self.keep_fds.
+        devnull = "/dev/null"
+        if hasattr(os, "devnull"):
+            # Python has set os.devnull on this system, use it instead as it
+            # might be different
+            # than /dev/null.
+            devnull = os.devnull
+
+        if self.auto_close_fds:
+            for fd in range(3, resource.getrlimit(resource.RLIMIT_NOFILE)[0]):
+                if fd not in self.keep_fds:
+                    try:
+                        os.close(fd)
+                    except OSError:
+                        pass
+
+        devnull_fd = os.open(devnull, os.O_RDWR)
+        os.dup2(devnull_fd, 0)
+        os.dup2(devnull_fd, 1)
+        os.dup2(devnull_fd, 2)
+
+        if self.logger is None:
+            # Initialize logging.
+            self.logger = logging.getLogger(self.app)
+            self.logger.setLevel(logging.DEBUG)
+            # Display log messages only on defined handlers.
+            self.logger.propagate = False
+
+            # Initialize syslog.
+            # It will correctly work on OS X, Linux and FreeBSD.
+            if sys.platform == "darwin":
+                syslog_address = "/var/run/syslog"
+            else:
+                syslog_address = "/dev/log"
+
+            # We will continue with syslog initialization only if
+            # actually have such capabilities
+            # on the machine we are running this.
+            if os.path.isfile(syslog_address):
+                syslog = handlers.SysLogHandler(syslog_address)
+                if self.verbose:
+                    syslog.setLevel(logging.DEBUG)
+                else:
+                    syslog.setLevel(logging.INFO)
+                # Try to mimic to normal syslog messages.
+                format_t = "%(asctime)s %(name)s: %(message)s"
+                formatter = logging.Formatter(format_t,
+                                              "%b %e %H:%M:%S")
+                syslog.setFormatter(formatter)
+
+                self.logger.addHandler(syslog)
+
+        # Set umask to default to safe file permissions when running
+        # as a root daemon. 027 is an
+        # octal number which we are typing as 0o27 for Python3 compatibility.
+        os.umask(0o27)
+
+        # Change to a known directory. If this isn't done, starting a daemon
+        # in a subdirectory that
+        # needs to be deleted results in "directory busy" errors.
+        os.chdir("/")
+
+        # Execute privileged action
+        privileged_action_result = self.privileged_action()
+        if not privileged_action_result:
+            privileged_action_result = []
+
+        # Change gid
+        if self.group:
+            try:
+                gid = grp.getgrnam(self.group).gr_gid
+            except KeyError:
+                self.logger.error("Group {0} not found".format(self.group))
+                sys.exit(1)
+            try:
+                os.setgid(gid)
+            except OSError:
+                self.logger.error("Unable to change gid.")
+                sys.exit(1)
+
+        # Change uid
+        if self.user:
+            try:
+                uid = pwd.getpwnam(self.user).pw_uid
+            except KeyError:
+                self.logger.error("User {0} not found.".format(self.user))
+                sys.exit(1)
+            try:
+                os.setuid(uid)
+            except OSError:
+                self.logger.error("Unable to change uid.")
+                sys.exit(1)
+
+        try:
+            lockfile.write("%s" % (os.getpid()))
+            lockfile.flush()
+        except IOError:
+            self.logger.error("Unable to write pid to the pidfile.")
+            print("Unable to write pid to the pidfile.")
+            sys.exit(1)
+
+        # Set custom action on SIGTERM.
+        signal.signal(signal.SIGTERM, self.sigterm)
+        atexit.register(self.exit)
+
+        self.logger.warn("Starting daemon.")
+
+        self.action(*privileged_action_result)
diff --git a/wally/sensors/deploy_sensors.py b/wally/sensors/deploy_sensors.py
new file mode 100644
index 0000000..249adfb
--- /dev/null
+++ b/wally/sensors/deploy_sensors.py
@@ -0,0 +1,87 @@
+import time
+import json
+import os.path
+import logging
+
+from concurrent.futures import ThreadPoolExecutor, wait
+
+from wally.ssh_utils import copy_paths, run_over_ssh
+
+logger = logging.getLogger('wally')
+
+
+def wait_all_ok(futures):
+    return all(future.result() for future in futures)
+
+
+def deploy_and_start_sensors(monitor_uri, sensor_configs,
+                             remote_path='/tmp/sensors/sensors'):
+
+    paths = {os.path.dirname(__file__): remote_path}
+    with ThreadPoolExecutor(max_workers=32) as executor:
+        futures = []
+
+        for node_sensor_config in sensor_configs:
+            futures.append(executor.submit(deploy_and_start_sensor,
+                                           paths,
+                                           node_sensor_config,
+                                           monitor_uri,
+                                           remote_path))
+
+        if not wait_all_ok(futures):
+            raise RuntimeError("Sensor deployment fails on some nodes")
+
+
+def deploy_and_start_sensor(paths, node_sensor_config,
+                            monitor_uri, remote_path):
+    try:
+        copy_paths(node_sensor_config.conn, paths)
+        sftp = node_sensor_config.conn.open_sftp()
+
+        config_remote_path = os.path.join(remote_path, "conf.json")
+
+        with sftp.open(config_remote_path, "w") as fd:
+            fd.write(json.dumps(node_sensor_config.sensors))
+
+        cmd_templ = 'env PYTHONPATH="{0}" python -m ' + \
+                    "sensors.main -d start -u {1} {2}"
+
+        cmd = cmd_templ.format(os.path.dirname(remote_path),
+                               monitor_uri,
+                               config_remote_path)
+
+        run_over_ssh(node_sensor_config.conn, cmd,
+                     node=node_sensor_config.url)
+        sftp.close()
+
+    except:
+        msg = "During deploing sensors in {0}".format(node_sensor_config.url)
+        logger.exception(msg)
+        return False
+    return True
+
+
+def stop_and_remove_sensor(conn, url, remote_path):
+    cmd = 'env PYTHONPATH="{0}" python -m sensors.main -d stop'
+
+    run_over_ssh(conn, cmd.format(remote_path), node=url)
+
+    # some magic
+    time.sleep(0.3)
+
+    conn.exec_command("rm -rf {0}".format(remote_path))
+
+    logger.debug("Sensors stopped and removed")
+
+
+def stop_and_remove_sensors(configs, remote_path='/tmp/sensors'):
+    with ThreadPoolExecutor(max_workers=32) as executor:
+        futures = []
+
+        for node_sensor_config in configs:
+            futures.append(executor.submit(stop_and_remove_sensor,
+                                           node_sensor_config.conn,
+                                           node_sensor_config.url,
+                                           remote_path))
+
+        wait(futures)
diff --git a/wally/sensors/discover.py b/wally/sensors/discover.py
new file mode 100644
index 0000000..f227043
--- /dev/null
+++ b/wally/sensors/discover.py
@@ -0,0 +1,9 @@
+all_sensors = {}
+
+
+def provides(sensor_class_name):
+    def closure(func):
+        assert sensor_class_name not in all_sensors
+        all_sensors[sensor_class_name] = func
+        return func
+    return closure
diff --git a/wally/sensors/main.py b/wally/sensors/main.py
new file mode 100644
index 0000000..3753e7c
--- /dev/null
+++ b/wally/sensors/main.py
@@ -0,0 +1,118 @@
+import os
+import sys
+import time
+import json
+import glob
+import signal
+import os.path
+import argparse
+
+from .sensors.utils import SensorInfo
+from .daemonize import Daemonize
+from .discover import all_sensors
+from .protocol import create_protocol
+
+
+# load all sensors
+from . import sensors
+sensors_dir = os.path.dirname(sensors.__file__)
+for fname in glob.glob(os.path.join(sensors_dir, "*.py")):
+    mod_name = os.path.basename(fname[:-3])
+    __import__("sensors.sensors." + mod_name)
+
+
+def get_values(required_sensors):
+    result = {}
+    for sensor_name, params in required_sensors:
+        if sensor_name in all_sensors:
+            result.update(all_sensors[sensor_name](**params))
+        else:
+            msg = "Sensor {0!r} isn't available".format(sensor_name)
+            raise ValueError(msg)
+    return time.time(), result
+
+
+def parse_args(args):
+    parser = argparse.ArgumentParser()
+    parser.add_argument('-d', '--daemon',
+                        choices=('start', 'stop', 'status'),
+                        default=None)
+
+    parser.add_argument('-u', '--url', default='stdout://')
+    parser.add_argument('-t', '--timeout', type=float, default=1)
+    parser.add_argument('-l', '--list-sensors', action='store_true')
+    parser.add_argument('sensors_config', type=argparse.FileType('r'),
+                        default=None, nargs='?')
+    return parser.parse_args(args[1:])
+
+
+def daemon_main(required_sensors, opts):
+    sender = create_protocol(opts.url)
+    prev = {}
+
+    while True:
+        gtime, data = get_values(required_sensors.items())
+        curr = {'time': SensorInfo(gtime, True)}
+        for name, val in data.items():
+            if val.is_accumulated:
+                if name in prev:
+                    curr[name] = SensorInfo(val.value - prev[name], True)
+                prev[name] = val.value
+            else:
+                curr[name] = SensorInfo(val.value, False)
+        sender.send(curr)
+        time.sleep(opts.timeout)
+
+
+def pid_running(pid):
+    return os.path.exists("/proc/" + str(pid))
+
+
+def main(argv):
+    opts = parse_args(argv)
+
+    if opts.list_sensors:
+        print "\n".join(sorted(all_sensors))
+        return 0
+
+    if opts.daemon is not None:
+        pid_file = "/tmp/sensors.pid"
+        if opts.daemon == 'start':
+            required_sensors = json.loads(opts.sensors_config.read())
+
+            def root_func():
+                daemon_main(required_sensors, opts)
+
+            daemon = Daemonize(app="perfcollect_app",
+                               pid=pid_file,
+                               action=root_func)
+            daemon.start()
+        elif opts.daemon == 'stop':
+            if os.path.isfile(pid_file):
+                pid = int(open(pid_file).read())
+                if pid_running(pid):
+                    os.kill(pid, signal.SIGTERM)
+
+                time.sleep(0.1)
+
+                if pid_running(pid):
+                    os.kill(pid, signal.SIGKILL)
+
+                if os.path.isfile(pid_file):
+                    os.unlink(pid_file)
+        elif opts.daemon == 'status':
+            if os.path.isfile(pid_file):
+                pid = int(open(pid_file).read())
+                if pid_running(pid):
+                    print "running"
+                    return
+            print "stopped"
+        else:
+            raise ValueError("Unknown daemon operation {}".format(opts.daemon))
+    else:
+        required_sensors = json.loads(opts.sensors_config.read())
+        daemon_main(required_sensors, opts)
+    return 0
+
+if __name__ == "__main__":
+    exit(main(sys.argv))
diff --git a/wally/sensors/protocol.py b/wally/sensors/protocol.py
new file mode 100644
index 0000000..c2ace01
--- /dev/null
+++ b/wally/sensors/protocol.py
@@ -0,0 +1,192 @@
+import sys
+import time
+import socket
+import select
+import cPickle as pickle
+from urlparse import urlparse
+
+from . import cp_transport
+
+
+class Timeout(Exception):
+    pass
+
+
+# ------------------------------------- Serializers --------------------------
+
+
+class ISensortResultsSerializer(object):
+    def pack(self, data):
+        pass
+
+    def unpack(self, data):
+        pass
+
+
+class PickleSerializer(ISensortResultsSerializer):
+    def pack(self, data):
+        ndata = {key: val.value for key, val in data.items()}
+        return pickle.dumps(ndata)
+
+    def unpack(self, data):
+        return pickle.loads(data)
+
+try:
+    # try to use full-function lib
+    import msgpack
+
+    class mgspackSerializer(ISensortResultsSerializer):
+        def pack(self, data):
+            return msgpack.packb(data)
+
+        def unpack(self, data):
+            return msgpack.unpackb(data)
+
+    MSGPackSerializer = mgspackSerializer
+except ImportError:
+    # use local lib, if failed import
+    import umsgpack
+
+    class umsgspackSerializer(ISensortResultsSerializer):
+        def pack(self, data):
+            return umsgpack.packb(data)
+
+        def unpack(self, data):
+            return umsgpack.unpackb(data)
+
+    MSGPackSerializer = umsgspackSerializer
+
+# ------------------------------------- Transports ---------------------------
+
+
+class ITransport(object):
+    def __init__(self, receiver):
+        pass
+
+    def send(self, data):
+        pass
+
+    def recv(self, timeout=None):
+        pass
+
+
+class StdoutTransport(ITransport):
+    MIN_COL_WIDTH = 10
+
+    def __init__(self, receiver, delta=True):
+        if receiver:
+            cname = self.__class__.__name__
+            raise ValueError("{0} don't allows receiving".format(cname))
+
+        self.headers = None
+        self.line_format = ""
+        self.prev = {}
+        self.delta = delta
+        self.fd = sys.stdout
+
+    def send(self, data):
+        if self.headers is None:
+            self.headers = sorted(data)
+
+            for pos, header in enumerate(self.headers):
+                self.line_format += "{%s:>%s}" % (pos,
+                                                  max(len(header) + 1,
+                                                      self.MIN_COL_WIDTH))
+
+            print self.line_format.format(*self.headers)
+
+        if self.delta:
+            vals = [data[header].value - self.prev.get(header, 0)
+                    for header in self.headers]
+
+            self.prev.update({header: data[header].value
+                              for header in self.headers})
+        else:
+            vals = [data[header].value for header in self.headers]
+
+        self.fd.write(self.line_format.format(*vals) + "\n")
+
+    def recv(self, timeout=None):
+        cname = self.__class__.__name__
+        raise ValueError("{0} don't allows receiving".format(cname))
+
+
+class FileTransport(StdoutTransport):
+    def __init__(self, receiver, fname, delta=True):
+        StdoutTransport.__init__(self, receiver, delta)
+        self.fd = open(fname, "w")
+
+
+class UDPTransport(ITransport):
+    def __init__(self, receiver, ip, port, packer_cls):
+        self.port = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+        if receiver:
+            self.port.bind((ip, port))
+            self.packer_cls = packer_cls
+            self.packers = {}
+        else:
+            self.packer = packer_cls()
+            self.dst = (ip, port)
+
+    def send(self, data):
+        raw_data = self.packer.pack(data)
+        self.port.sendto(raw_data, self.dst)
+
+    def recv(self, timeout=None):
+        r, _, _ = select.select([self.port], [], [], timeout)
+        if len(r) != 0:
+            raw_data, addr = self.port.recvfrom(10000)
+            packer = self.packers.setdefault(addr, self.packer_cls())
+            return addr, packer.unpack(raw_data)
+        else:
+            raise Timeout()
+
+
+class HugeUDPTransport(ITransport, cp_transport.Sender):
+    def __init__(self, receiver, ip, port, packer_cls):
+        cp_transport.Sender.__init__(self, port=port, host=ip)
+        if receiver:
+            self.bind()
+
+    def send(self, data):
+        self.send_by_protocol(data)
+
+    def recv(self, timeout=None):
+        begin = time.time()
+
+        while True:
+
+            try:
+                # return not None, if packet is ready
+                ready = self.recv_by_protocol()
+                # if data ready - return it
+                if ready is not None:
+                    return ready
+                # if data not ready - check if it's time to die
+                if time.time() - begin >= timeout:
+                    break
+
+            except cp_transport.Timeout:
+                # no answer yet - check, if timeout end
+                if time.time() - begin >= timeout:
+                    break
+# -------------------------- Factory function --------------------------------
+
+
+def create_protocol(uri, receiver=False):
+    parsed_uri = urlparse(uri)
+    if parsed_uri.scheme == 'stdout':
+        return StdoutTransport(receiver)
+    elif parsed_uri.scheme == 'udp':
+        ip, port = parsed_uri.netloc.split(":")
+        return UDPTransport(receiver, ip=ip, port=int(port),
+                            packer_cls=PickleSerializer)
+    elif parsed_uri.scheme == 'file':
+        return FileTransport(receiver, parsed_uri.path)
+    elif parsed_uri.scheme == 'hugeudp':
+        ip, port = parsed_uri.netloc.split(":")
+        return HugeUDPTransport(receiver, ip=ip, port=int(port),
+                                packer_cls=MSGPackSerializer)
+    else:
+        templ = "Can't instantiate transport from {0!r}"
+        raise ValueError(templ.format(uri))
diff --git a/wally/sensors/receiver.py b/wally/sensors/receiver.py
new file mode 100644
index 0000000..ff0f223
--- /dev/null
+++ b/wally/sensors/receiver.py
@@ -0,0 +1,19 @@
+from .api import start_monitoring, Empty
+# from influx_exporter import connect, add_data
+
+uri = "udp://192.168.0.104:12001"
+# infldb_url = "influxdb://perf:perf@192.168.152.42:8086/perf"
+# conn = connect(infldb_url)
+
+monitor_config = {'127.0.0.1':
+                  {"block-io": {'allowed_prefixes': ['sda1', 'rbd1']},
+                   "net-io": {"allowed_prefixes": ["virbr2"]}}}
+
+with start_monitoring(uri, monitor_config) as queue:
+    while True:
+        try:
+            (ip, port), data = queue.get(True, 1)
+            print (ip, port), data
+            # add_data(conn, ip, [data])
+        except Empty:
+            pass
diff --git a/wally/sensors/sensors/__init__.py b/wally/sensors/sensors/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/wally/sensors/sensors/__init__.py
diff --git a/wally/sensors/sensors/io_sensors.py b/wally/sensors/sensors/io_sensors.py
new file mode 100644
index 0000000..c9ff340
--- /dev/null
+++ b/wally/sensors/sensors/io_sensors.py
@@ -0,0 +1,72 @@
+from ..discover import provides
+from .utils import SensorInfo, is_dev_accepted
+
+#  1 - major number
+#  2 - minor mumber
+#  3 - device name
+#  4 - reads completed successfully
+#  5 - reads merged
+#  6 - sectors read
+#  7 - time spent reading (ms)
+#  8 - writes completed
+#  9 - writes merged
+# 10 - sectors written
+# 11 - time spent writing (ms)
+# 12 - I/Os currently in progress
+# 13 - time spent doing I/Os (ms)
+# 14 - weighted time spent doing I/Os (ms)
+
+io_values_pos = [
+    (3, 'reads_completed', True),
+    (5, 'sectors_read', True),
+    (6, 'rtime', True),
+    (7, 'writes_completed', True),
+    (9, 'sectors_written', True),
+    (10, 'wtime', True),
+    (11, 'io_queue', False),
+    (13, 'io_time', True)
+]
+
+
+@provides("block-io")
+def io_stat(disallowed_prefixes=('ram', 'loop'), allowed_prefixes=None):
+    results = {}
+    for line in open('/proc/diskstats'):
+        vals = line.split()
+        dev_name = vals[2]
+
+        dev_ok = is_dev_accepted(dev_name,
+                                 disallowed_prefixes,
+                                 allowed_prefixes)
+
+        if dev_ok:
+            for pos, name, accum_val in io_values_pos:
+                sensor_name = "{0}.{1}".format(dev_name, name)
+                results[sensor_name] = SensorInfo(int(vals[pos]), accum_val)
+    return results
+
+
+def get_latency(stat1, stat2):
+    disks = set(i.split('.', 1)[0] for i in stat1)
+    results = {}
+
+    for disk in disks:
+        rdc = disk + '.reads_completed'
+        wrc = disk + '.writes_completed'
+        rdt = disk + '.rtime'
+        wrt = disk + '.wtime'
+        lat = 0.0
+
+        io_ops1 = stat1[rdc].value + stat1[wrc].value
+        io_ops2 = stat2[rdc].value + stat2[wrc].value
+
+        diops = io_ops2 - io_ops1
+
+        if diops != 0:
+            io1 = stat1[rdt].value + stat1[wrt].value
+            io2 = stat2[rdt].value + stat2[wrt].value
+            lat = abs(float(io1 - io2)) / diops
+
+        results[disk + '.latence'] = SensorInfo(lat, False)
+
+    return results
diff --git a/wally/sensors/sensors/net_sensors.py b/wally/sensors/sensors/net_sensors.py
new file mode 100644
index 0000000..4a4e477
--- /dev/null
+++ b/wally/sensors/sensors/net_sensors.py
@@ -0,0 +1,43 @@
+from ..discover import provides
+from .utils import SensorInfo, is_dev_accepted
+
+#  1 - major number
+#  2 - minor mumber
+#  3 - device name
+#  4 - reads completed successfully
+#  5 - reads merged
+#  6 - sectors read
+#  7 - time spent reading (ms)
+#  8 - writes completed
+#  9 - writes merged
+# 10 - sectors written
+# 11 - time spent writing (ms)
+# 12 - I/Os currently in progress
+# 13 - time spent doing I/Os (ms)
+# 14 - weighted time spent doing I/Os (ms)
+
+net_values_pos = [
+    (0, 'recv_bytes', True),
+    (1, 'recv_packets', True),
+    (8, 'send_bytes', True),
+    (9, 'send_packets', True),
+]
+
+
+@provides("net-io")
+def net_stat(disallowed_prefixes=('docker',), allowed_prefixes=None):
+    results = {}
+
+    for line in open('/proc/net/dev').readlines()[2:]:
+        dev_name, stats = line.split(":", 1)
+        dev_name = dev_name.strip()
+        vals = stats.split()
+
+        dev_ok = is_dev_accepted(dev_name,
+                                 disallowed_prefixes,
+                                 allowed_prefixes)
+        if dev_ok:
+            for pos, name, accum_val in net_values_pos:
+                sensor_name = "{0}.{1}".format(dev_name, name)
+                results[sensor_name] = SensorInfo(int(vals[pos]), accum_val)
+    return results
diff --git a/wally/sensors/sensors/pscpu_sensors.py b/wally/sensors/sensors/pscpu_sensors.py
new file mode 100644
index 0000000..cffdb71
--- /dev/null
+++ b/wally/sensors/sensors/pscpu_sensors.py
@@ -0,0 +1,37 @@
+import os
+
+from ..discover import provides
+from .utils import SensorInfo, get_pid_name, get_pid_list
+
+
+@provides("perprocess-cpu")
+def pscpu_stat(disallowed_prefixes=None, allowed_prefixes=None):
+    results = {}
+    pid_list = get_pid_list(disallowed_prefixes, allowed_prefixes)
+
+    for pid in pid_list:
+        try:
+            dev_name = get_pid_name(pid)
+
+            pid_stat1 = pid_stat(pid)
+
+            sensor_name = "{0}.{1}".format(dev_name, pid)
+            results[sensor_name] = SensorInfo(pid_stat1, True)
+        except IOError:
+            # may be, proc has already terminated, skip it
+            continue
+    return results
+
+
+def pid_stat(pid):
+    """ Return total cpu usage time from process"""
+    # read /proc/pid/stat
+    with open(os.path.join('/proc/', pid, 'stat'), 'r') as pidfile:
+        proctimes = pidfile.readline().split()
+    # get utime from /proc/<pid>/stat, 14 item
+    utime = proctimes[13]
+    # get stime from proc/<pid>/stat, 15 item
+    stime = proctimes[14]
+    # count total process used time
+    proctotal = int(utime) + int(stime)
+    return float(proctotal)
diff --git a/wally/sensors/sensors/psram_sensors.py b/wally/sensors/sensors/psram_sensors.py
new file mode 100644
index 0000000..cbd85e6
--- /dev/null
+++ b/wally/sensors/sensors/psram_sensors.py
@@ -0,0 +1,76 @@
+from ..discover import provides
+from .utils import SensorInfo, get_pid_name, get_pid_list
+
+
+# Based on ps_mem.py:
+# Licence: LGPLv2
+# Author:  P@draigBrady.com
+# Source:  http://www.pixelbeat.org/scripts/ps_mem.py
+#   http://github.com/pixelb/scripts/commits/master/scripts/ps_mem.py
+
+
+# Note shared is always a subset of rss (trs is not always)
+def get_mem_stats(pid):
+    """ Return memory data of pid in format (private, shared) """
+
+    fname = '/proc/{0}/{1}'.format(pid, "smaps")
+    lines = open(fname).readlines()
+
+    shared = 0
+    private = 0
+    pss = 0
+
+    # add 0.5KiB as this avg error due to trunctation
+    pss_adjust = 0.5
+
+    for line in lines:
+        if line.startswith("Shared"):
+            shared += int(line.split()[1])
+
+        if line.startswith("Private"):
+            private += int(line.split()[1])
+
+        if line.startswith("Pss"):
+            pss += float(line.split()[1]) + pss_adjust
+
+    # Note Shared + Private = Rss above
+    # The Rss in smaps includes video card mem etc.
+
+    if pss != 0:
+        shared = int(pss - private)
+
+    return (private, shared)
+
+
+@provides("perprocess-ram")
+def psram_stat(disallowed_prefixes=None, allowed_prefixes=None):
+    results = {}
+    pid_list = get_pid_list(disallowed_prefixes, allowed_prefixes)
+    print pid_list
+    for pid in pid_list:
+        try:
+            dev_name = get_pid_name(pid)
+
+            private, shared = get_mem_stats(pid)
+            total = private + shared
+            sys_total = get_ram_size()
+            usage = float(total) / float(sys_total)
+
+            sensor_name = "{0}.{1}".format(dev_name, pid)
+
+            results[sensor_name + ".private_mem"] = SensorInfo(private, False)
+            results[sensor_name + ".shared_mem"] = SensorInfo(shared, False)
+            results[sensor_name + ".used_mem"] = SensorInfo(total, False)
+            name = sensor_name + ".mem_usage_percent"
+            results[name] = SensorInfo(usage * 100, False)
+        except IOError:
+            # permission denied or proc die
+            continue
+    return results
+
+
+def get_ram_size():
+    """ Return RAM size in Kb"""
+    with open("/proc/meminfo") as proc:
+        mem_total = proc.readline().split()
+    return mem_total[1]
diff --git a/wally/sensors/sensors/syscpu_sensors.py b/wally/sensors/sensors/syscpu_sensors.py
new file mode 100644
index 0000000..d3da02b
--- /dev/null
+++ b/wally/sensors/sensors/syscpu_sensors.py
@@ -0,0 +1,40 @@
+from ..discover import provides
+from .utils import SensorInfo, is_dev_accepted
+
+# 0 - cpu name
+# 1 - user: normal processes executing in user mode
+# 2 - nice: niced processes executing in user mode
+# 3 - system: processes executing in kernel mode
+# 4 - idle: twiddling thumbs
+# 5 - iowait: waiting for I/O to complete
+# 6 - irq: servicing interrupts
+# 7 - softirq: servicing softirqs
+
+io_values_pos = [
+    (1, 'user_processes', True),
+    (2, 'nice_processes', True),
+    (3, 'system_processes', True),
+    (4, 'idle_time', True),
+]
+
+
+@provides("system-cpu")
+def syscpu_stat(disallowed_prefixes=('intr', 'ctxt', 'btime', 'processes',
+                                 'procs_running', 'procs_blocked', 'softirq'),
+            allowed_prefixes=None):
+    results = {}
+
+    for line in open('/proc/stat'):
+        vals = line.split()
+        dev_name = vals[0]
+
+        dev_ok = is_dev_accepted(dev_name,
+                                 disallowed_prefixes,
+                                 allowed_prefixes)
+
+        if dev_ok:
+            for pos, name, accum_val in io_values_pos:
+                sensor_name = "{0}.{1}".format(dev_name, name)
+                results[sensor_name] = SensorInfo(int(vals[pos]), accum_val)
+    return results
+
diff --git a/wally/sensors/sensors/sysram_sensors.py b/wally/sensors/sensors/sysram_sensors.py
new file mode 100644
index 0000000..c78eddd
--- /dev/null
+++ b/wally/sensors/sensors/sysram_sensors.py
@@ -0,0 +1,34 @@
+from ..discover import provides
+from .utils import SensorInfo, is_dev_accepted
+
+
+# return this values or setted in allowed
+ram_fields = [
+    'MemTotal',
+    'MemFree',
+    'Buffers',
+    'Cached',
+    'SwapCached',
+    'Dirty',
+    'Writeback',
+    'SwapTotal',
+    'SwapFree'
+]
+
+
+@provides("system-ram")
+def sysram_stat(disallowed_prefixes=None, allowed_prefixes=None):
+    if allowed_prefixes is None:
+        allowed_prefixes = ram_fields
+    results = {}
+    for line in open('/proc/meminfo'):
+        vals = line.split()
+        dev_name = vals[0]
+
+        dev_ok = is_dev_accepted(dev_name,
+                                 disallowed_prefixes,
+                                 allowed_prefixes)
+
+        if dev_ok:
+            results[dev_name] = SensorInfo(int(vals[1]), False)
+    return results
diff --git a/wally/sensors/sensors/utils.py b/wally/sensors/sensors/utils.py
new file mode 100644
index 0000000..ad08676
--- /dev/null
+++ b/wally/sensors/sensors/utils.py
@@ -0,0 +1,83 @@
+import os
+
+from collections import namedtuple
+
+SensorInfo = namedtuple("SensorInfo", ['value', 'is_accumulated'])
+
+
+def is_dev_accepted(name, disallowed_prefixes, allowed_prefixes):
+    dev_ok = True
+
+    if disallowed_prefixes is not None:
+        dev_ok = all(not name.startswith(prefix)
+                     for prefix in disallowed_prefixes)
+
+    if dev_ok and allowed_prefixes is not None:
+        dev_ok = any(name.startswith(prefix)
+                     for prefix in allowed_prefixes)
+
+    return dev_ok
+
+
+def get_pid_list(disallowed_prefixes, allowed_prefixes):
+    """ Return pid list from list of pids and names """
+    # exceptions
+    but = disallowed_prefixes if disallowed_prefixes is not None else []
+    if allowed_prefixes is None:
+        # if nothing setted - all ps will be returned except setted
+        result = [pid
+                  for pid in os.listdir('/proc')
+                  if pid.isdigit() and pid not in but]
+    else:
+        result = []
+        for pid in os.listdir('/proc'):
+            if pid.isdigit() and pid not in but:
+                name = get_pid_name(pid)
+                if pid in allowed_prefixes or \
+                   any(name.startswith(val) for val in allowed_prefixes):
+                    print name
+                    # this is allowed pid?
+                    result.append(pid)
+    return result
+
+
+def get_pid_name(pid):
+    """ Return name by pid """
+    try:
+        with open(os.path.join('/proc/', pid, 'cmdline'), 'r') as pidfile:
+            try:
+                cmd = pidfile.readline().split()[0]
+                return os.path.basename(cmd).rstrip('\x00')
+            except IndexError:
+                # no cmd returned
+                return "<NO NAME>"
+    except IOError:
+        # upstream wait any string, no matter if we couldn't read proc
+        return "no_such_process"
+
+
+def delta(func, only_upd=True):
+    prev = {}
+    while True:
+        for dev_name, vals in func():
+            if dev_name not in prev:
+                prev[dev_name] = {}
+                for name, (val, _) in vals.items():
+                    prev[dev_name][name] = val
+            else:
+                dev_prev = prev[dev_name]
+                res = {}
+                for stat_name, (val, accum_val) in vals.items():
+                    if accum_val:
+                        if stat_name in dev_prev:
+                            delta = int(val) - int(dev_prev[stat_name])
+                            if not only_upd or 0 != delta:
+                                res[stat_name] = str(delta)
+                        dev_prev[stat_name] = val
+                    elif not only_upd or '0' != val:
+                        res[stat_name] = val
+
+                if only_upd and len(res) == 0:
+                    continue
+                yield dev_name, res
+        yield None, None
diff --git a/wally/sensors/storage/__init__.py b/wally/sensors/storage/__init__.py
new file mode 100644
index 0000000..d7bf6aa
--- /dev/null
+++ b/wally/sensors/storage/__init__.py
@@ -0,0 +1,104 @@
+import struct
+
+
+def pack(val, tp=True):
+    if isinstance(val, int):
+        assert 0 <= val < 2 ** 16
+
+        if tp:
+            res = 'i'
+        else:
+            res = ""
+
+        res += struct.pack("!U", val)
+    elif isinstance(val, dict):
+        assert len(val) < 2 ** 16
+        if tp:
+            res = "d"
+        else:
+            res = ""
+
+        res += struct.pack("!U", len(val))
+        for k, v in dict.items():
+            assert 0 <= k < 2 ** 16
+            assert 0 <= v < 2 ** 32
+            res += struct.pack("!UI", k, v)
+    elif isinstance(val, str):
+        assert len(val) < 256
+        if tp:
+            res = "s"
+        else:
+            res = ""
+        res += chr(len(val)) + val
+    else:
+        raise ValueError()
+
+    return res
+
+
+def unpack(fd, tp=None):
+    if tp is None:
+        tp = fd.read(1)
+
+    if tp == 'i':
+        return struct.unpack("!U", fd.read(2))
+    elif tp == 'd':
+        res = {}
+        val_len = struct.unpack("!U", fd.read(2))
+        for _ in range(val_len):
+            k, v = struct.unpack("!UI", fd.read(6))
+            res[k] = v
+        return res
+    elif tp == 's':
+        val_len = struct.unpack("!U", fd.read(2))
+        return fd.read(val_len)
+
+    raise ValueError()
+
+
+class LocalStorage(object):
+    NEW_DATA = 0
+    NEW_SENSOR = 1
+    NEW_SOURCE = 2
+
+    def __init__(self, fd):
+        self.fd = fd
+        self.sensor_ids = {}
+        self.sources_ids = {}
+        self.max_source_id = 0
+        self.max_sensor_id = 0
+
+    def add_data(self, source, sensor_values):
+        source_id = self.sources_ids.get(source)
+        if source_id is None:
+            source_id = self.max_source_id
+            self.sources_ids[source] = source_id
+            self.emit(self.NEW_SOURCE, source_id, source)
+            self.max_source_id += 1
+
+        new_sensor_values = {}
+
+        for name, val in sensor_values.items():
+            sensor_id = self.sensor_ids.get(name)
+            if sensor_id is None:
+                sensor_id = self.max_sensor_id
+                self.sensor_ids[name] = sensor_id
+                self.emit(self.NEW_SENSOR, sensor_id, name)
+                self.max_sensor_id += 1
+            new_sensor_values[sensor_id] = val
+
+        self.emit(self.NEW_DATA, source_id, new_sensor_values)
+
+    def emit(self, tp, v1, v2):
+        self.fd.write(chr(tp) + pack(v1, False) + pack(v2))
+
+    def readall(self):
+        tp = self.fd.read(1)
+        if ord(tp) == self.NEW_DATA:
+            pass
+        elif ord(tp) == self.NEW_SENSOR:
+            pass
+        elif ord(tp) == self.NEW_SOURCE:
+            pass
+        else:
+            raise ValueError()
diff --git a/wally/sensors/storage/grafana.py b/wally/sensors/storage/grafana.py
new file mode 100644
index 0000000..9823fac
--- /dev/null
+++ b/wally/sensors/storage/grafana.py
@@ -0,0 +1,47 @@
+import json
+
+
+query = """
+select value from "{series}"
+where $timeFilter and
+host='{host}' and device='{device}'
+order asc
+"""
+
+
+def make_dashboard_file(config):
+    series = ['writes_completed', 'sectors_written']
+    dashboards = []
+
+    for serie in series:
+        dashboard = dict(title=serie, type='graph',
+                         span=12, fill=1, linewidth=2,
+                         tooltip={'shared': True})
+
+        targets = []
+
+        for ip, devs in config.items():
+            for device in devs:
+                params = {
+                    'series': serie,
+                    'host': ip,
+                    'device': device
+                }
+
+                target = dict(
+                    target="disk io",
+                    query=query.replace("\n", " ").format(**params).strip(),
+                    interval="",
+                    alias="{0} io {1}".format(ip, device),
+                    rawQuery=True
+                )
+                targets.append(target)
+
+        dashboard['targets'] = targets
+        dashboards.append(dashboard)
+
+    fc = open("grafana_template.js").read()
+    return fc % (json.dumps(dashboards),)
+
+
+print make_dashboard_file({'192.168.0.104': ['sda1', 'rbd1']})
diff --git a/wally/sensors/storage/grafana_template.js b/wally/sensors/storage/grafana_template.js
new file mode 100644
index 0000000..7c57924
--- /dev/null
+++ b/wally/sensors/storage/grafana_template.js
@@ -0,0 +1,46 @@
+/* global _ */
+
+/*
+ * Complex scripted dashboard
+ * This script generates a dashboard object that Grafana can load. It also takes a number of user
+ * supplied URL parameters (int ARGS variable)
+ *
+ * Return a dashboard object, or a function
+ *
+ * For async scripts, return a function, this function must take a single callback function as argument,
+ * call this callback function with the dashboard object (look at scripted_async.js for an example)
+ */
+
+
+
+// accessable variables in this scope
+var window, document, ARGS, $, jQuery, moment, kbn;
+
+// Setup some variables
+var dashboard;
+
+// All url parameters are available via the ARGS object
+var ARGS;
+
+// Intialize a skeleton with nothing but a rows array and service object
+dashboard = {rows : []};
+
+// Set a title
+dashboard.title = 'Tests dash';
+
+// Set default time
+// time can be overriden in the url using from/to parameteres, but this is
+// handled automatically in grafana core during dashboard initialization
+dashboard.time = {
+    from: "now-5m",
+    to: "now"
+};
+
+dashboard.rows.push({
+    title: 'Chart',
+    height: '300px',
+    panels: %s
+});
+
+
+return dashboard;
diff --git a/wally/sensors/storage/influx_exporter.py b/wally/sensors/storage/influx_exporter.py
new file mode 100644
index 0000000..34b3c0a
--- /dev/null
+++ b/wally/sensors/storage/influx_exporter.py
@@ -0,0 +1,31 @@
+from urlparse import urlparse
+from influxdb import InfluxDBClient
+
+
+def connect(url):
+    parsed_url = urlparse(url)
+    user_passwd, host_port = parsed_url.netloc.rsplit("@", 1)
+    user, passwd = user_passwd.split(":", 1)
+    host, port = host_port.split(":")
+    return InfluxDBClient(host, int(port), user, passwd, parsed_url.path[1:])
+
+
+def add_data(conn, hostname, data):
+    per_sensor_data = {}
+    for serie in data:
+        serie = serie.copy()
+        gtime = serie.pop('time')
+        for key, val in serie.items():
+            dev, sensor = key.split('.')
+            data = per_sensor_data.setdefault(sensor, [])
+            data.append([gtime, hostname, dev, val])
+
+    infl_data = []
+    columns = ['time', 'host', 'device', 'value']
+    for sensor_name, points in per_sensor_data.items():
+        infl_data.append(
+            {'columns': columns,
+             'name': sensor_name,
+             'points': points})
+
+    conn.write_points(infl_data)
diff --git a/wally/sensors/storage/koder.js b/wally/sensors/storage/koder.js
new file mode 100644
index 0000000..a65a454
--- /dev/null
+++ b/wally/sensors/storage/koder.js
@@ -0,0 +1,47 @@
+/* global _ */
+
+/*
+ * Complex scripted dashboard
+ * This script generates a dashboard object that Grafana can load. It also takes a number of user
+ * supplied URL parameters (int ARGS variable)
+ *
+ * Return a dashboard object, or a function
+ *
+ * For async scripts, return a function, this function must take a single callback function as argument,
+ * call this callback function with the dashboard object (look at scripted_async.js for an example)
+ */
+
+
+
+// accessable variables in this scope
+var window, document, ARGS, $, jQuery, moment, kbn;
+
+// Setup some variables
+var dashboard;
+
+// All url parameters are available via the ARGS object
+var ARGS;
+
+// Intialize a skeleton with nothing but a rows array and service object
+dashboard = {rows : []};
+
+// Set a title
+dashboard.title = 'Tests dash';
+
+// Set default time
+// time can be overriden in the url using from/to parameteres, but this is
+// handled automatically in grafana core during dashboard initialization
+dashboard.time = {
+    from: "now-5m",
+    to: "now"
+};
+
+dashboard.rows.push({
+    title: 'Chart',
+    height: '300px',
+    panels: [{"span": 12, "title": "writes_completed", "linewidth": 2, "type": "graph", "targets": [{"alias": "192.168.0.104 io sda1", "interval": "", "target": "disk io", "rawQuery": true, "query": "select value from \"writes_completed\" where $timeFilter and host='192.168.0.104' and device='sda1' order asc"}, {"alias": "192.168.0.104 io rbd1", "interval": "", "target": "disk io", "rawQuery": true, "query": "select value from \"writes_completed\" where $timeFilter and host='192.168.0.104' and device='rbd1' order asc"}], "tooltip": {"shared": true}, "fill": 1}, {"span": 12, "title": "sectors_written", "linewidth": 2, "type": "graph", "targets": [{"alias": "192.168.0.104 io sda1", "interval": "", "target": "disk io", "rawQuery": true, "query": "select value from \"sectors_written\" where $timeFilter and host='192.168.0.104' and device='sda1' order asc"}, {"alias": "192.168.0.104 io rbd1", "interval": "", "target": "disk io", "rawQuery": true, "query": "select value from \"sectors_written\" where $timeFilter and host='192.168.0.104' and device='rbd1' order asc"}], "tooltip": {"shared": true}, "fill": 1}]
+});
+
+
+return dashboard;
+
diff --git a/wally/sensors/umsgpack.py b/wally/sensors/umsgpack.py
new file mode 100644
index 0000000..0cdc83e
--- /dev/null
+++ b/wally/sensors/umsgpack.py
@@ -0,0 +1,879 @@
+# u-msgpack-python v2.0 - vsergeev at gmail
+# https://github.com/vsergeev/u-msgpack-python
+#
+# u-msgpack-python is a lightweight MessagePack serializer and deserializer
+# module, compatible with both Python 2 and 3, as well CPython and PyPy
+# implementations of Python. u-msgpack-python is fully compliant with the
+# latest MessagePack specification.com/msgpack/msgpack/blob/master/spec.md). In
+# particular, it supports the new binary, UTF-8 string, and application ext
+# types.
+#
+# MIT License
+#
+# Copyright (c) 2013-2014 Ivan A. Sergeev
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+"""
+u-msgpack-python v2.0 - vsergeev at gmail
+https://github.com/vsergeev/u-msgpack-python
+
+u-msgpack-python is a lightweight MessagePack serializer and deserializer
+module, compatible with both Python 2 and 3, as well CPython and PyPy
+implementations of Python. u-msgpack-python is fully compliant with the
+latest MessagePack specification.com/msgpack/msgpack/blob/master/spec.md). In
+particular, it supports the new binary, UTF-8 string, and application ext
+types.
+
+License: MIT
+"""
+
+__version__ = "2.0"
+"Module version string"
+
+version = (2,0)
+"Module version tuple"
+
+import struct
+import collections
+import sys
+import io
+
+################################################################################
+### Ext Class
+################################################################################
+
+# Extension type for application-defined types and data
+class Ext:
+    """
+    The Ext class facilitates creating a serializable extension object to store
+    an application-defined type and data byte array.
+    """
+
+    def __init__(self, type, data):
+        """
+        Construct a new Ext object.
+
+        Args:
+            type: application-defined type integer from 0 to 127
+            data: application-defined data byte array
+
+        Raises:
+            TypeError:
+                Specified ext type is outside of 0 to 127 range.
+
+        Example:
+        >>> foo = umsgpack.Ext(0x05, b"\x01\x02\x03")
+        >>> umsgpack.packb({u"special stuff": foo, u"awesome": True})
+        '\x82\xa7awesome\xc3\xadspecial stuff\xc7\x03\x05\x01\x02\x03'
+        >>> bar = umsgpack.unpackb(_)
+        >>> print(bar["special stuff"])
+        Ext Object (Type: 0x05, Data: 01 02 03)
+        >>>
+        """
+        # Application ext type should be 0 <= type <= 127
+        if not isinstance(type, int) or not (type >= 0 and type <= 127):
+            raise TypeError("ext type out of range")
+        # Check data is type bytes
+        elif sys.version_info[0] == 3 and not isinstance(data, bytes):
+            raise TypeError("ext data is not type \'bytes\'")
+        elif sys.version_info[0] == 2 and not isinstance(data, str):
+            raise TypeError("ext data is not type \'str\'")
+        self.type = type
+        self.data = data
+
+    def __eq__(self, other):
+        """
+        Compare this Ext object with another for equality.
+        """
+        return (isinstance(other, self.__class__) and
+                self.type == other.type and
+                self.data == other.data)
+
+    def __ne__(self, other):
+        """
+        Compare this Ext object with another for inequality.
+        """
+        return not self.__eq__(other)
+
+    def __str__(self):
+        """
+        String representation of this Ext object.
+        """
+        s = "Ext Object (Type: 0x%02x, Data: " % self.type
+        for i in range(min(len(self.data), 8)):
+            if i > 0:
+                s += " "
+            if isinstance(self.data[i], int):
+                s += "%02x" % (self.data[i])
+            else:
+                s += "%02x" % ord(self.data[i])
+        if len(self.data) > 8:
+            s += " ..."
+        s += ")"
+        return s
+
+################################################################################
+### Exceptions
+################################################################################
+
+# Base Exception classes
+class PackException(Exception):
+    "Base class for exceptions encountered during packing."
+    pass
+class UnpackException(Exception):
+    "Base class for exceptions encountered during unpacking."
+    pass
+
+# Packing error
+class UnsupportedTypeException(PackException):
+    "Object type not supported for packing."
+    pass
+
+# Unpacking error
+class InsufficientDataException(UnpackException):
+    "Insufficient data to unpack the encoded object."
+    pass
+class InvalidStringException(UnpackException):
+    "Invalid UTF-8 string encountered during unpacking."
+    pass
+class ReservedCodeException(UnpackException):
+    "Reserved code encountered during unpacking."
+    pass
+class UnhashableKeyException(UnpackException):
+    """
+    Unhashable key encountered during map unpacking.
+    The serialized map cannot be deserialized into a Python dictionary.
+    """
+    pass
+class DuplicateKeyException(UnpackException):
+    "Duplicate key encountered during map unpacking."
+    pass
+
+# Backwards compatibility
+KeyNotPrimitiveException = UnhashableKeyException
+KeyDuplicateException = DuplicateKeyException
+
+################################################################################
+### Exported Functions and Globals
+################################################################################
+
+# Exported functions and variables, set up in __init()
+pack = None
+packb = None
+unpack = None
+unpackb = None
+dump = None
+dumps = None
+load = None
+loads = None
+
+compatibility = False
+"""
+Compatibility mode boolean.
+
+When compatibility mode is enabled, u-msgpack-python will serialize both
+unicode strings and bytes into the old "raw" msgpack type, and deserialize the
+"raw" msgpack type into bytes. This provides backwards compatibility with the
+old MessagePack specification.
+
+Example:
+>>> umsgpack.compatibility = True
+>>>
+>>> umsgpack.packb([u"some string", b"some bytes"])
+b'\x92\xabsome string\xaasome bytes'
+>>> umsgpack.unpackb(_)
+[b'some string', b'some bytes']
+>>>
+"""
+
+################################################################################
+### Packing
+################################################################################
+
+# You may notice struct.pack("B", obj) instead of the simpler chr(obj) in the
+# code below. This is to allow for seamless Python 2 and 3 compatibility, as
+# chr(obj) has a str return type instead of bytes in Python 3, and
+# struct.pack(...) has the right return type in both versions.
+
+def _pack_integer(obj, fp):
+    if obj < 0:
+        if obj >= -32:
+            fp.write(struct.pack("b", obj))
+        elif obj >= -2**(8-1):
+            fp.write(b"\xd0" + struct.pack("b", obj))
+        elif obj >= -2**(16-1):
+            fp.write(b"\xd1" + struct.pack(">h", obj))
+        elif obj >= -2**(32-1):
+            fp.write(b"\xd2" + struct.pack(">i", obj))
+        elif obj >= -2**(64-1):
+            fp.write(b"\xd3" + struct.pack(">q", obj))
+        else:
+            raise UnsupportedTypeException("huge signed int")
+    else:
+        if obj <= 127:
+            fp.write(struct.pack("B", obj))
+        elif obj <= 2**8-1:
+            fp.write(b"\xcc" + struct.pack("B", obj))
+        elif obj <= 2**16-1:
+            fp.write(b"\xcd" + struct.pack(">H", obj))
+        elif obj <= 2**32-1:
+            fp.write(b"\xce" + struct.pack(">I", obj))
+        elif obj <= 2**64-1:
+            fp.write(b"\xcf" + struct.pack(">Q", obj))
+        else:
+            raise UnsupportedTypeException("huge unsigned int")
+
+def _pack_nil(obj, fp):
+    fp.write(b"\xc0")
+
+def _pack_boolean(obj, fp):
+    fp.write(b"\xc3" if obj else b"\xc2")
+
+def _pack_float(obj, fp):
+    if _float_size == 64:
+        fp.write(b"\xcb" + struct.pack(">d", obj))
+    else:
+        fp.write(b"\xca" + struct.pack(">f", obj))
+
+def _pack_string(obj, fp):
+    obj = obj.encode('utf-8')
+    if len(obj) <= 31:
+        fp.write(struct.pack("B", 0xa0 | len(obj)) + obj)
+    elif len(obj) <= 2**8-1:
+        fp.write(b"\xd9" + struct.pack("B", len(obj)) + obj)
+    elif len(obj) <= 2**16-1:
+        fp.write(b"\xda" + struct.pack(">H", len(obj)) + obj)
+    elif len(obj) <= 2**32-1:
+        fp.write(b"\xdb" + struct.pack(">I", len(obj)) + obj)
+    else:
+        raise UnsupportedTypeException("huge string")
+
+def _pack_binary(obj, fp):
+    if len(obj) <= 2**8-1:
+        fp.write(b"\xc4" + struct.pack("B", len(obj)) + obj)
+    elif len(obj) <= 2**16-1:
+        fp.write(b"\xc5" + struct.pack(">H", len(obj)) + obj)
+    elif len(obj) <= 2**32-1:
+        fp.write(b"\xc6" + struct.pack(">I", len(obj)) + obj)
+    else:
+        raise UnsupportedTypeException("huge binary string")
+
+def _pack_oldspec_raw(obj, fp):
+    if len(obj) <= 31:
+        fp.write(struct.pack("B", 0xa0 | len(obj)) + obj)
+    elif len(obj) <= 2**16-1:
+        fp.write(b"\xda" + struct.pack(">H", len(obj)) + obj)
+    elif len(obj) <= 2**32-1:
+        fp.write(b"\xdb" + struct.pack(">I", len(obj)) + obj)
+    else:
+        raise UnsupportedTypeException("huge raw string")
+
+def _pack_ext(obj, fp):
+    if len(obj.data) == 1:
+        fp.write(b"\xd4" + struct.pack("B", obj.type & 0xff) + obj.data)
+    elif len(obj.data) == 2:
+        fp.write(b"\xd5" + struct.pack("B", obj.type & 0xff) + obj.data)
+    elif len(obj.data) == 4:
+        fp.write(b"\xd6" + struct.pack("B", obj.type & 0xff) + obj.data)
+    elif len(obj.data) == 8:
+        fp.write(b"\xd7" + struct.pack("B", obj.type & 0xff) + obj.data)
+    elif len(obj.data) == 16:
+        fp.write(b"\xd8" + struct.pack("B", obj.type & 0xff) + obj.data)
+    elif len(obj.data) <= 2**8-1:
+        fp.write(b"\xc7" + struct.pack("BB", len(obj.data), obj.type & 0xff) + obj.data)
+    elif len(obj.data) <= 2**16-1:
+        fp.write(b"\xc8" + struct.pack(">HB", len(obj.data), obj.type & 0xff) + obj.data)
+    elif len(obj.data) <= 2**32-1:
+        fp.write(b"\xc9" + struct.pack(">IB", len(obj.data), obj.type & 0xff) + obj.data)
+    else:
+        raise UnsupportedTypeException("huge ext data")
+
+def _pack_array(obj, fp):
+    if len(obj) <= 15:
+        fp.write(struct.pack("B", 0x90 | len(obj)))
+    elif len(obj) <= 2**16-1:
+        fp.write(b"\xdc" + struct.pack(">H", len(obj)))
+    elif len(obj) <= 2**32-1:
+        fp.write(b"\xdd" + struct.pack(">I", len(obj)))
+    else:
+        raise UnsupportedTypeException("huge array")
+
+    for e in obj:
+        pack(e, fp)
+
+def _pack_map(obj, fp):
+    if len(obj) <= 15:
+        fp.write(struct.pack("B", 0x80 | len(obj)))
+    elif len(obj) <= 2**16-1:
+        fp.write(b"\xde" + struct.pack(">H", len(obj)))
+    elif len(obj) <= 2**32-1:
+        fp.write(b"\xdf" + struct.pack(">I", len(obj)))
+    else:
+        raise UnsupportedTypeException("huge array")
+
+    for k,v in obj.items():
+        pack(k, fp)
+        pack(v, fp)
+
+########################################
+
+# Pack for Python 2, with 'unicode' type, 'str' type, and 'long' type
+def _pack2(obj, fp):
+    """
+    Serialize a Python object into MessagePack bytes.
+
+    Args:
+        obj: a Python object
+        fp: a .write()-supporting file-like object
+
+    Returns:
+        None.
+
+    Raises:
+        UnsupportedType(PackException):
+            Object type not supported for packing.
+
+    Example:
+    >>> f = open('test.bin', 'w')
+    >>> umsgpack.pack({u"compact": True, u"schema": 0}, f)
+    >>>
+    """
+
+    global compatibility
+
+    if obj is None:
+        _pack_nil(obj, fp)
+    elif isinstance(obj, bool):
+        _pack_boolean(obj, fp)
+    elif isinstance(obj, int) or isinstance(obj, long):
+        _pack_integer(obj, fp)
+    elif isinstance(obj, float):
+        _pack_float(obj, fp)
+    elif compatibility and isinstance(obj, unicode):
+        _pack_oldspec_raw(bytes(obj), fp)
+    elif compatibility and isinstance(obj, bytes):
+        _pack_oldspec_raw(obj, fp)
+    elif isinstance(obj, unicode):
+        _pack_string(obj, fp)
+    elif isinstance(obj, str):
+        _pack_binary(obj, fp)
+    elif isinstance(obj, list) or isinstance(obj, tuple):
+        _pack_array(obj, fp)
+    elif isinstance(obj, dict):
+        _pack_map(obj, fp)
+    elif isinstance(obj, Ext):
+        _pack_ext(obj, fp)
+    else:
+        raise UnsupportedTypeException("unsupported type: %s" % str(type(obj)))
+
+# Pack for Python 3, with unicode 'str' type, 'bytes' type, and no 'long' type
+def _pack3(obj, fp):
+    """
+    Serialize a Python object into MessagePack bytes.
+
+    Args:
+        obj: a Python object
+        fp: a .write()-supporting file-like object
+
+    Returns:
+        None.
+
+    Raises:
+        UnsupportedType(PackException):
+            Object type not supported for packing.
+
+    Example:
+    >>> f = open('test.bin', 'w')
+    >>> umsgpack.pack({u"compact": True, u"schema": 0}, fp)
+    >>>
+    """
+    global compatibility
+
+    if obj is None:
+        _pack_nil(obj, fp)
+    elif isinstance(obj, bool):
+        _pack_boolean(obj, fp)
+    elif isinstance(obj, int):
+        _pack_integer(obj, fp)
+    elif isinstance(obj, float):
+        _pack_float(obj, fp)
+    elif compatibility and isinstance(obj, str):
+        _pack_oldspec_raw(obj.encode('utf-8'), fp)
+    elif compatibility and isinstance(obj, bytes):
+        _pack_oldspec_raw(obj, fp)
+    elif isinstance(obj, str):
+        _pack_string(obj, fp)
+    elif isinstance(obj, bytes):
+        _pack_binary(obj, fp)
+    elif isinstance(obj, list) or isinstance(obj, tuple):
+        _pack_array(obj, fp)
+    elif isinstance(obj, dict):
+        _pack_map(obj, fp)
+    elif isinstance(obj, Ext):
+        _pack_ext(obj, fp)
+    else:
+        raise UnsupportedTypeException("unsupported type: %s" % str(type(obj)))
+
+def _packb2(obj):
+    """
+    Serialize a Python object into MessagePack bytes.
+
+    Args:
+        obj: a Python object
+
+    Returns:
+        A 'str' containing serialized MessagePack bytes.
+
+    Raises:
+        UnsupportedType(PackException):
+            Object type not supported for packing.
+
+    Example:
+    >>> umsgpack.packb({u"compact": True, u"schema": 0})
+    '\x82\xa7compact\xc3\xa6schema\x00'
+    >>>
+    """
+    fp = io.BytesIO()
+    _pack2(obj, fp)
+    return fp.getvalue()
+
+def _packb3(obj):
+    """
+    Serialize a Python object into MessagePack bytes.
+
+    Args:
+        obj: a Python object
+
+    Returns:
+        A 'bytes' containing serialized MessagePack bytes.
+
+    Raises:
+        UnsupportedType(PackException):
+            Object type not supported for packing.
+
+    Example:
+    >>> umsgpack.packb({u"compact": True, u"schema": 0})
+    b'\x82\xa7compact\xc3\xa6schema\x00'
+    >>>
+    """
+    fp = io.BytesIO()
+    _pack3(obj, fp)
+    return fp.getvalue()
+
+################################################################################
+### Unpacking
+################################################################################
+
+def _read_except(fp, n):
+    data = fp.read(n)
+    if len(data) < n:
+        raise InsufficientDataException()
+    return data
+
+def _unpack_integer(code, fp):
+    if (ord(code) & 0xe0) == 0xe0:
+        return struct.unpack("b", code)[0]
+    elif code == b'\xd0':
+        return struct.unpack("b", _read_except(fp, 1))[0]
+    elif code == b'\xd1':
+        return struct.unpack(">h", _read_except(fp, 2))[0]
+    elif code == b'\xd2':
+        return struct.unpack(">i", _read_except(fp, 4))[0]
+    elif code == b'\xd3':
+        return struct.unpack(">q", _read_except(fp, 8))[0]
+    elif (ord(code) & 0x80) == 0x00:
+        return struct.unpack("B", code)[0]
+    elif code == b'\xcc':
+        return struct.unpack("B", _read_except(fp, 1))[0]
+    elif code == b'\xcd':
+        return struct.unpack(">H", _read_except(fp, 2))[0]
+    elif code == b'\xce':
+        return struct.unpack(">I", _read_except(fp, 4))[0]
+    elif code == b'\xcf':
+        return struct.unpack(">Q", _read_except(fp, 8))[0]
+    raise Exception("logic error, not int: 0x%02x" % ord(code))
+
+def _unpack_reserved(code, fp):
+    if code == b'\xc1':
+        raise ReservedCodeException("encountered reserved code: 0x%02x" % ord(code))
+    raise Exception("logic error, not reserved code: 0x%02x" % ord(code))
+
+def _unpack_nil(code, fp):
+    if code == b'\xc0':
+        return None
+    raise Exception("logic error, not nil: 0x%02x" % ord(code))
+
+def _unpack_boolean(code, fp):
+    if code == b'\xc2':
+        return False
+    elif code == b'\xc3':
+        return True
+    raise Exception("logic error, not boolean: 0x%02x" % ord(code))
+
+def _unpack_float(code, fp):
+    if code == b'\xca':
+        return struct.unpack(">f", _read_except(fp, 4))[0]
+    elif code == b'\xcb':
+        return struct.unpack(">d", _read_except(fp, 8))[0]
+    raise Exception("logic error, not float: 0x%02x" % ord(code))
+
+def _unpack_string(code, fp):
+    if (ord(code) & 0xe0) == 0xa0:
+        length = ord(code) & ~0xe0
+    elif code == b'\xd9':
+        length = struct.unpack("B", _read_except(fp, 1))[0]
+    elif code == b'\xda':
+        length = struct.unpack(">H", _read_except(fp, 2))[0]
+    elif code == b'\xdb':
+        length = struct.unpack(">I", _read_except(fp, 4))[0]
+    else:
+        raise Exception("logic error, not string: 0x%02x" % ord(code))
+
+    # Always return raw bytes in compatibility mode
+    global compatibility
+    if compatibility:
+        return _read_except(fp, length)
+
+    try:
+        return bytes.decode(_read_except(fp, length), 'utf-8')
+    except UnicodeDecodeError:
+        raise InvalidStringException("unpacked string is not utf-8")
+
+def _unpack_binary(code, fp):
+    if code == b'\xc4':
+        length = struct.unpack("B", _read_except(fp, 1))[0]
+    elif code == b'\xc5':
+        length = struct.unpack(">H", _read_except(fp, 2))[0]
+    elif code == b'\xc6':
+        length = struct.unpack(">I", _read_except(fp, 4))[0]
+    else:
+        raise Exception("logic error, not binary: 0x%02x" % ord(code))
+
+    return _read_except(fp, length)
+
+def _unpack_ext(code, fp):
+    if code == b'\xd4':
+        length = 1
+    elif code == b'\xd5':
+        length = 2
+    elif code == b'\xd6':
+        length = 4
+    elif code == b'\xd7':
+        length = 8
+    elif code == b'\xd8':
+        length = 16
+    elif code == b'\xc7':
+        length = struct.unpack("B", _read_except(fp, 1))[0]
+    elif code == b'\xc8':
+        length = struct.unpack(">H", _read_except(fp, 2))[0]
+    elif code == b'\xc9':
+        length = struct.unpack(">I", _read_except(fp, 4))[0]
+    else:
+        raise Exception("logic error, not ext: 0x%02x" % ord(code))
+
+    return Ext(ord(_read_except(fp, 1)), _read_except(fp, length))
+
+def _unpack_array(code, fp):
+    if (ord(code) & 0xf0) == 0x90:
+        length = (ord(code) & ~0xf0)
+    elif code == b'\xdc':
+        length = struct.unpack(">H", _read_except(fp, 2))[0]
+    elif code == b'\xdd':
+        length = struct.unpack(">I", _read_except(fp, 4))[0]
+    else:
+        raise Exception("logic error, not array: 0x%02x" % ord(code))
+
+    return [_unpack(fp) for i in range(length)]
+
+def _deep_list_to_tuple(obj):
+    if isinstance(obj, list):
+        return tuple([_deep_list_to_tuple(e) for e in obj])
+    return obj
+
+def _unpack_map(code, fp):
+    if (ord(code) & 0xf0) == 0x80:
+        length = (ord(code) & ~0xf0)
+    elif code == b'\xde':
+        length = struct.unpack(">H", _read_except(fp, 2))[0]
+    elif code == b'\xdf':
+        length = struct.unpack(">I", _read_except(fp, 4))[0]
+    else:
+        raise Exception("logic error, not map: 0x%02x" % ord(code))
+
+    d = {}
+    for i in range(length):
+        # Unpack key
+        k = _unpack(fp)
+
+        if isinstance(k, list):
+            # Attempt to convert list into a hashable tuple
+            k = _deep_list_to_tuple(k)
+        elif not isinstance(k, collections.Hashable):
+            raise UnhashableKeyException("encountered unhashable key: %s, %s" % (str(k), str(type(k))))
+        elif k in d:
+            raise DuplicateKeyException("encountered duplicate key: %s, %s" % (str(k), str(type(k))))
+
+        # Unpack value
+        v = _unpack(fp)
+
+        try:
+            d[k] = v
+        except TypeError:
+            raise UnhashableKeyException("encountered unhashable key: %s" % str(k))
+    return d
+
+def _unpack(fp):
+    code = _read_except(fp, 1)
+    return _unpack_dispatch_table[code](code, fp)
+
+########################################
+
+def _unpack2(fp):
+    """
+    Deserialize MessagePack bytes into a Python object.
+
+    Args:
+        fp: a .read()-supporting file-like object
+
+    Returns:
+        A Python object.
+
+    Raises:
+        InsufficientDataException(UnpackException):
+            Insufficient data to unpack the encoded object.
+        InvalidStringException(UnpackException):
+            Invalid UTF-8 string encountered during unpacking.
+        ReservedCodeException(UnpackException):
+            Reserved code encountered during unpacking.
+        UnhashableKeyException(UnpackException):
+            Unhashable key encountered during map unpacking.
+            The serialized map cannot be deserialized into a Python dictionary.
+        DuplicateKeyException(UnpackException):
+            Duplicate key encountered during map unpacking.
+
+    Example:
+    >>> f = open("test.bin")
+    >>> umsgpack.unpackb(f)
+    {u'compact': True, u'schema': 0}
+    >>>
+    """
+    return _unpack(fp)
+
+def _unpack3(fp):
+    """
+    Deserialize MessagePack bytes into a Python object.
+
+    Args:
+        fp: a .read()-supporting file-like object
+
+    Returns:
+        A Python object.
+
+    Raises:
+        InsufficientDataException(UnpackException):
+            Insufficient data to unpack the encoded object.
+        InvalidStringException(UnpackException):
+            Invalid UTF-8 string encountered during unpacking.
+        ReservedCodeException(UnpackException):
+            Reserved code encountered during unpacking.
+        UnhashableKeyException(UnpackException):
+            Unhashable key encountered during map unpacking.
+            The serialized map cannot be deserialized into a Python dictionary.
+        DuplicateKeyException(UnpackException):
+            Duplicate key encountered during map unpacking.
+
+    Example:
+    >>> f = open("test.bin")
+    >>> umsgpack.unpackb(f)
+    {'compact': True, 'schema': 0}
+    >>>
+    """
+    return _unpack(fp)
+
+# For Python 2, expects a str object
+def _unpackb2(s):
+    """
+    Deserialize MessagePack bytes into a Python object.
+
+    Args:
+        s: a 'str' containing serialized MessagePack bytes
+
+    Returns:
+        A Python object.
+
+    Raises:
+        TypeError:
+            Packed data is not type 'str'.
+        InsufficientDataException(UnpackException):
+            Insufficient data to unpack the encoded object.
+        InvalidStringException(UnpackException):
+            Invalid UTF-8 string encountered during unpacking.
+        ReservedCodeException(UnpackException):
+            Reserved code encountered during unpacking.
+        UnhashableKeyException(UnpackException):
+            Unhashable key encountered during map unpacking.
+            The serialized map cannot be deserialized into a Python dictionary.
+        DuplicateKeyException(UnpackException):
+            Duplicate key encountered during map unpacking.
+
+    Example:
+    >>> umsgpack.unpackb(b'\x82\xa7compact\xc3\xa6schema\x00')
+    {u'compact': True, u'schema': 0}
+    >>>
+    """
+    if not isinstance(s, str):
+        raise TypeError("packed data is not type 'str'")
+    return _unpack(io.BytesIO(s))
+
+# For Python 3, expects a bytes object
+def _unpackb3(s):
+    """
+    Deserialize MessagePack bytes into a Python object.
+
+    Args:
+        s: a 'bytes' containing serialized MessagePack bytes
+
+    Returns:
+        A Python object.
+
+    Raises:
+        TypeError:
+            Packed data is not type 'bytes'.
+        InsufficientDataException(UnpackException):
+            Insufficient data to unpack the encoded object.
+        InvalidStringException(UnpackException):
+            Invalid UTF-8 string encountered during unpacking.
+        ReservedCodeException(UnpackException):
+            Reserved code encountered during unpacking.
+        UnhashableKeyException(UnpackException):
+            Unhashable key encountered during map unpacking.
+            The serialized map cannot be deserialized into a Python dictionary.
+        DuplicateKeyException(UnpackException):
+            Duplicate key encountered during map unpacking.
+
+    Example:
+    >>> umsgpack.unpackb(b'\x82\xa7compact\xc3\xa6schema\x00')
+    {'compact': True, 'schema': 0}
+    >>>
+    """
+    if not isinstance(s, bytes):
+        raise TypeError("packed data is not type 'bytes'")
+    return _unpack(io.BytesIO(s))
+
+################################################################################
+### Module Initialization
+################################################################################
+
+def __init():
+    global pack
+    global packb
+    global unpack
+    global unpackb
+    global dump
+    global dumps
+    global load
+    global loads
+    global compatibility
+    global _float_size
+    global _unpack_dispatch_table
+
+    # Compatibility mode for handling strings/bytes with the old specification
+    compatibility = False
+
+    # Auto-detect system float precision
+    if sys.float_info.mant_dig == 53:
+        _float_size = 64
+    else:
+        _float_size = 32
+
+    # Map packb and unpackb to the appropriate version
+    if sys.version_info[0] == 3:
+        pack = _pack3
+        packb = _packb3
+        dump = _pack3
+        dumps = _packb3
+        unpack = _unpack3
+        unpackb = _unpackb3
+        load = _unpack3
+        loads = _unpackb3
+    else:
+        pack = _pack2
+        packb = _packb2
+        dump = _pack2
+        dumps = _packb2
+        unpack = _unpack2
+        unpackb = _unpackb2
+        load = _unpack2
+        loads = _unpackb2
+
+    # Build a dispatch table for fast lookup of unpacking function
+
+    _unpack_dispatch_table = {}
+    # Fix uint
+    for code in range(0, 0x7f+1):
+        _unpack_dispatch_table[struct.pack("B", code)] = _unpack_integer
+    # Fix map
+    for code in range(0x80, 0x8f+1):
+        _unpack_dispatch_table[struct.pack("B", code)] = _unpack_map
+    # Fix array
+    for code in range(0x90, 0x9f+1):
+        _unpack_dispatch_table[struct.pack("B", code)] = _unpack_array
+    # Fix str
+    for code in range(0xa0, 0xbf+1):
+        _unpack_dispatch_table[struct.pack("B", code)] = _unpack_string
+    # Nil
+    _unpack_dispatch_table[b'\xc0'] = _unpack_nil
+    # Reserved
+    _unpack_dispatch_table[b'\xc1'] = _unpack_reserved
+    # Boolean
+    _unpack_dispatch_table[b'\xc2'] = _unpack_boolean
+    _unpack_dispatch_table[b'\xc3'] = _unpack_boolean
+    # Bin
+    for code in range(0xc4, 0xc6+1):
+        _unpack_dispatch_table[struct.pack("B", code)] = _unpack_binary
+    # Ext
+    for code in range(0xc7, 0xc9+1):
+        _unpack_dispatch_table[struct.pack("B", code)] = _unpack_ext
+    # Float
+    _unpack_dispatch_table[b'\xca'] = _unpack_float
+    _unpack_dispatch_table[b'\xcb'] = _unpack_float
+    # Uint
+    for code in range(0xcc, 0xcf+1):
+        _unpack_dispatch_table[struct.pack("B", code)] = _unpack_integer
+    # Int
+    for code in range(0xd0, 0xd3+1):
+        _unpack_dispatch_table[struct.pack("B", code)] = _unpack_integer
+    # Fixext
+    for code in range(0xd4, 0xd8+1):
+        _unpack_dispatch_table[struct.pack("B", code)] = _unpack_ext
+    # String
+    for code in range(0xd9, 0xdb+1):
+        _unpack_dispatch_table[struct.pack("B", code)] = _unpack_string
+    # Array
+    _unpack_dispatch_table[b'\xdc'] = _unpack_array
+    _unpack_dispatch_table[b'\xdd'] = _unpack_array
+    # Map
+    _unpack_dispatch_table[b'\xde'] = _unpack_map
+    _unpack_dispatch_table[b'\xdf'] = _unpack_map
+    # Negative fixint
+    for code in range(0xe0, 0xff+1):
+        _unpack_dispatch_table[struct.pack("B", code)] = _unpack_integer
+
+__init()