Merge pull request #4 from Mirantis/sender

improvement of protocol
diff --git a/wally/sensors/api.py b/wally/sensors/api.py
index 6eed90a..ea73789 100644
--- a/wally/sensors/api.py
+++ b/wally/sensors/api.py
@@ -20,7 +20,8 @@
 
 
 class SensorConfig(object):
-    def __init__(self, conn, url, sensors, source_id, monitor_url=None):
+    def __init__(self, conn, url, sensors, source_id,
+                 monitor_url=None):
         self.conn = conn
         self.url = url
         self.sensors = sensors
diff --git a/wally/sensors/main.py b/wally/sensors/main.py
index 2b6ab8b..cdbe9fc 100644
--- a/wally/sensors/main.py
+++ b/wally/sensors/main.py
@@ -47,14 +47,14 @@
 
 
 def daemon_main(required_sensors, opts):
-    sender = create_protocol(opts.url)
-    prev = {}
-
     try:
         source_id = str(required_sensors.pop('source_id'))
     except KeyError:
         source_id = None
 
+    sender = create_protocol(opts.url)
+    prev = {}
+
     while True:
         gtime, data = get_values(required_sensors.items())
         curr = {'time': SensorInfo(gtime, True)}
@@ -70,6 +70,7 @@
             curr['source_id'] = source_id
 
         sender.send(curr)
+
         time.sleep(opts.timeout)
 
 
@@ -89,6 +90,9 @@
         if opts.daemon == 'start':
             required_sensors = json.loads(opts.sensors_config.read())
 
+            if "protocol_data" not in required_sensors:
+                raise ValueError("No protocol data provided in config")
+
             def root_func():
                 daemon_main(required_sensors, opts)
 
diff --git a/wally/sensors/protocol.py b/wally/sensors/protocol.py
index a67ddde..9fd1a84 100644
--- a/wally/sensors/protocol.py
+++ b/wally/sensors/protocol.py
@@ -1,5 +1,6 @@
 import sys
 import time
+import struct
 import socket
 import select
 import cPickle as pickle
@@ -23,6 +24,89 @@
         pass
 
 
+class StructSerializer(ISensortResultsSerializer):
+    class LocalConfig(object):
+        def __init__(self):
+            self.last_format_sent = -1
+            self.initial_sent = False
+            self.initial_times = 5
+            self.field_order = None
+
+    def __init__(self):
+        self.configs = {}
+
+    def pack(self, data):
+        OLD_FORMAT = 5
+        source_id = data["source_id"]
+        config = self.configs.setdefault(source_id,
+                                         StructSerializer.LocalConfig())
+
+        if config.field_order is None or \
+           not config.initial_sent or \
+           time.time() - config.last_format_sent > OLD_FORMAT:
+           # send|resend format
+            field_order = sorted(data.keys())
+
+            config.field_order = field_order
+            config.last_format_sent = time.time()
+            if not config.initial_sent:
+                config.initial_times -= 1
+                config.initial_sent = (config.initial_times <= 0)
+
+            forder = "\n".join(field_order)
+            flen = struct.pack("!H", len(field_order))
+            return "\x00{0}\x00{1}{2}".format(source_id, flen, forder)
+        else:
+            # send data
+            # time will be first after source_id
+            vals = [data["time"]]
+            for name in config.field_order:
+                if name in data:
+                    vals.append(data[name])
+            pack_fmt = "!" + ("I" * len(vals))
+            packed_data = struct.pack(pack_fmt, vals)
+            return "\x01{0}\x00{1}".format(source_id, packed_data)
+
+    def unpack(self, data):
+        code = data[0]
+        data = data[1:]
+        source_id, _, packed_data = data.partition("\x00")
+        config = self.configs.setdefault(source_id,
+                                         StructSerializer.LocalConfig())
+        unpacked_data = {"source_id":source_id}
+
+        if code == "\x00":
+            # fields order provided
+            flen = struct.unpack("!H", packed_data[:2])
+            forder = packed_data[2:].split("\n")
+            if len(forder) != flen:
+                return unpacked_data
+            config.field_order = forder
+            return unpacked_data
+
+        else:
+            # data provided
+            # try to find fields_order
+            if config.field_order is None:
+                raise ValueError("No fields order provided"
+                                 " for {0}, cannot unpack".format(source_id))
+
+            val_size = 4
+            if len(packed_data) % val_size != 0:
+                raise ValueError("Bad packet received"
+                                 " from {0}, cannot unpack".format(source_id))
+            datalen = len(packed_data) / val_size
+            pack_fmt = "!" + ("I" * datalen)
+            vals = struct.unpack(pack_fmt, packed_data)
+
+            unpacked_data['time'] = vals[0]
+            i = 1
+            for field in config.field_order:
+                data[field] = vals[i]
+                i += 1
+            return data
+
+
 class PickleSerializer(ISensortResultsSerializer):
     def pack(self, data):
         ndata = {}
@@ -36,31 +120,6 @@
     def unpack(self, data):
         return pickle.loads(data)
 
-try:
-    # try to use full-function lib
-    import msgpack
-
-    class mgspackSerializer(ISensortResultsSerializer):
-        def pack(self, data):
-            return msgpack.packb(data)
-
-        def unpack(self, data):
-            return msgpack.unpackb(data)
-
-    MSGPackSerializer = mgspackSerializer
-except ImportError:
-    # use local lib, if failed import
-    import umsgpack
-
-    class umsgspackSerializer(ISensortResultsSerializer):
-        def pack(self, data):
-            return umsgpack.packb(data)
-
-        def unpack(self, data):
-            return umsgpack.unpackb(data)
-
-    MSGPackSerializer = umsgspackSerializer
-
 # ------------------------------------- Transports ---------------------------
 
 
@@ -147,34 +206,6 @@
             raise Timeout()
 
 
-class HugeUDPTransport(ITransport, cp_transport.Sender):
-    def __init__(self, receiver, ip, port, packer_cls):
-        cp_transport.Sender.__init__(self, port=port, host=ip)
-        if receiver:
-            self.bind()
-
-    def send(self, data):
-        self.send_by_protocol(data)
-
-    def recv(self, timeout=None):
-        begin = time.time()
-
-        while True:
-
-            try:
-                # return not None, if packet is ready
-                ready = self.recv_by_protocol()
-                # if data ready - return it
-                if ready is not None:
-                    return ready
-                # if data not ready - check if it's time to die
-                if time.time() - begin >= timeout:
-                    break
-
-            except cp_transport.Timeout:
-                # no answer yet - check, if timeout end
-                if time.time() - begin >= timeout:
-                    break
 # -------------------------- Factory function --------------------------------
 
 
@@ -185,13 +216,9 @@
     elif parsed_uri.scheme == 'udp':
         ip, port = parsed_uri.netloc.split(":")
         return UDPTransport(receiver, ip=ip, port=int(port),
-                            packer_cls=PickleSerializer)
+                            packer_cls=StructSerializer)
     elif parsed_uri.scheme == 'file':
         return FileTransport(receiver, parsed_uri.path)
-    elif parsed_uri.scheme == 'hugeudp':
-        ip, port = parsed_uri.netloc.split(":")
-        return HugeUDPTransport(receiver, ip=ip, port=int(port),
-                                packer_cls=MSGPackSerializer)
     else:
         templ = "Can't instantiate transport from {0!r}"
         raise ValueError(templ.format(uri))
diff --git a/wally/sensors/umsgpack.py b/wally/sensors/umsgpack.py
deleted file mode 100644
index 0cdc83e..0000000
--- a/wally/sensors/umsgpack.py
+++ /dev/null
@@ -1,879 +0,0 @@
-# u-msgpack-python v2.0 - vsergeev at gmail
-# https://github.com/vsergeev/u-msgpack-python
-#
-# u-msgpack-python is a lightweight MessagePack serializer and deserializer
-# module, compatible with both Python 2 and 3, as well CPython and PyPy
-# implementations of Python. u-msgpack-python is fully compliant with the
-# latest MessagePack specification.com/msgpack/msgpack/blob/master/spec.md). In
-# particular, it supports the new binary, UTF-8 string, and application ext
-# types.
-#
-# MIT License
-#
-# Copyright (c) 2013-2014 Ivan A. Sergeev
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-#
-"""
-u-msgpack-python v2.0 - vsergeev at gmail
-https://github.com/vsergeev/u-msgpack-python
-
-u-msgpack-python is a lightweight MessagePack serializer and deserializer
-module, compatible with both Python 2 and 3, as well CPython and PyPy
-implementations of Python. u-msgpack-python is fully compliant with the
-latest MessagePack specification.com/msgpack/msgpack/blob/master/spec.md). In
-particular, it supports the new binary, UTF-8 string, and application ext
-types.
-
-License: MIT
-"""
-
-__version__ = "2.0"
-"Module version string"
-
-version = (2,0)
-"Module version tuple"
-
-import struct
-import collections
-import sys
-import io
-
-################################################################################
-### Ext Class
-################################################################################
-
-# Extension type for application-defined types and data
-class Ext:
-    """
-    The Ext class facilitates creating a serializable extension object to store
-    an application-defined type and data byte array.
-    """
-
-    def __init__(self, type, data):
-        """
-        Construct a new Ext object.
-
-        Args:
-            type: application-defined type integer from 0 to 127
-            data: application-defined data byte array
-
-        Raises:
-            TypeError:
-                Specified ext type is outside of 0 to 127 range.
-
-        Example:
-        >>> foo = umsgpack.Ext(0x05, b"\x01\x02\x03")
-        >>> umsgpack.packb({u"special stuff": foo, u"awesome": True})
-        '\x82\xa7awesome\xc3\xadspecial stuff\xc7\x03\x05\x01\x02\x03'
-        >>> bar = umsgpack.unpackb(_)
-        >>> print(bar["special stuff"])
-        Ext Object (Type: 0x05, Data: 01 02 03)
-        >>>
-        """
-        # Application ext type should be 0 <= type <= 127
-        if not isinstance(type, int) or not (type >= 0 and type <= 127):
-            raise TypeError("ext type out of range")
-        # Check data is type bytes
-        elif sys.version_info[0] == 3 and not isinstance(data, bytes):
-            raise TypeError("ext data is not type \'bytes\'")
-        elif sys.version_info[0] == 2 and not isinstance(data, str):
-            raise TypeError("ext data is not type \'str\'")
-        self.type = type
-        self.data = data
-
-    def __eq__(self, other):
-        """
-        Compare this Ext object with another for equality.
-        """
-        return (isinstance(other, self.__class__) and
-                self.type == other.type and
-                self.data == other.data)
-
-    def __ne__(self, other):
-        """
-        Compare this Ext object with another for inequality.
-        """
-        return not self.__eq__(other)
-
-    def __str__(self):
-        """
-        String representation of this Ext object.
-        """
-        s = "Ext Object (Type: 0x%02x, Data: " % self.type
-        for i in range(min(len(self.data), 8)):
-            if i > 0:
-                s += " "
-            if isinstance(self.data[i], int):
-                s += "%02x" % (self.data[i])
-            else:
-                s += "%02x" % ord(self.data[i])
-        if len(self.data) > 8:
-            s += " ..."
-        s += ")"
-        return s
-
-################################################################################
-### Exceptions
-################################################################################
-
-# Base Exception classes
-class PackException(Exception):
-    "Base class for exceptions encountered during packing."
-    pass
-class UnpackException(Exception):
-    "Base class for exceptions encountered during unpacking."
-    pass
-
-# Packing error
-class UnsupportedTypeException(PackException):
-    "Object type not supported for packing."
-    pass
-
-# Unpacking error
-class InsufficientDataException(UnpackException):
-    "Insufficient data to unpack the encoded object."
-    pass
-class InvalidStringException(UnpackException):
-    "Invalid UTF-8 string encountered during unpacking."
-    pass
-class ReservedCodeException(UnpackException):
-    "Reserved code encountered during unpacking."
-    pass
-class UnhashableKeyException(UnpackException):
-    """
-    Unhashable key encountered during map unpacking.
-    The serialized map cannot be deserialized into a Python dictionary.
-    """
-    pass
-class DuplicateKeyException(UnpackException):
-    "Duplicate key encountered during map unpacking."
-    pass
-
-# Backwards compatibility
-KeyNotPrimitiveException = UnhashableKeyException
-KeyDuplicateException = DuplicateKeyException
-
-################################################################################
-### Exported Functions and Globals
-################################################################################
-
-# Exported functions and variables, set up in __init()
-pack = None
-packb = None
-unpack = None
-unpackb = None
-dump = None
-dumps = None
-load = None
-loads = None
-
-compatibility = False
-"""
-Compatibility mode boolean.
-
-When compatibility mode is enabled, u-msgpack-python will serialize both
-unicode strings and bytes into the old "raw" msgpack type, and deserialize the
-"raw" msgpack type into bytes. This provides backwards compatibility with the
-old MessagePack specification.
-
-Example:
->>> umsgpack.compatibility = True
->>>
->>> umsgpack.packb([u"some string", b"some bytes"])
-b'\x92\xabsome string\xaasome bytes'
->>> umsgpack.unpackb(_)
-[b'some string', b'some bytes']
->>>
-"""
-
-################################################################################
-### Packing
-################################################################################
-
-# You may notice struct.pack("B", obj) instead of the simpler chr(obj) in the
-# code below. This is to allow for seamless Python 2 and 3 compatibility, as
-# chr(obj) has a str return type instead of bytes in Python 3, and
-# struct.pack(...) has the right return type in both versions.
-
-def _pack_integer(obj, fp):
-    if obj < 0:
-        if obj >= -32:
-            fp.write(struct.pack("b", obj))
-        elif obj >= -2**(8-1):
-            fp.write(b"\xd0" + struct.pack("b", obj))
-        elif obj >= -2**(16-1):
-            fp.write(b"\xd1" + struct.pack(">h", obj))
-        elif obj >= -2**(32-1):
-            fp.write(b"\xd2" + struct.pack(">i", obj))
-        elif obj >= -2**(64-1):
-            fp.write(b"\xd3" + struct.pack(">q", obj))
-        else:
-            raise UnsupportedTypeException("huge signed int")
-    else:
-        if obj <= 127:
-            fp.write(struct.pack("B", obj))
-        elif obj <= 2**8-1:
-            fp.write(b"\xcc" + struct.pack("B", obj))
-        elif obj <= 2**16-1:
-            fp.write(b"\xcd" + struct.pack(">H", obj))
-        elif obj <= 2**32-1:
-            fp.write(b"\xce" + struct.pack(">I", obj))
-        elif obj <= 2**64-1:
-            fp.write(b"\xcf" + struct.pack(">Q", obj))
-        else:
-            raise UnsupportedTypeException("huge unsigned int")
-
-def _pack_nil(obj, fp):
-    fp.write(b"\xc0")
-
-def _pack_boolean(obj, fp):
-    fp.write(b"\xc3" if obj else b"\xc2")
-
-def _pack_float(obj, fp):
-    if _float_size == 64:
-        fp.write(b"\xcb" + struct.pack(">d", obj))
-    else:
-        fp.write(b"\xca" + struct.pack(">f", obj))
-
-def _pack_string(obj, fp):
-    obj = obj.encode('utf-8')
-    if len(obj) <= 31:
-        fp.write(struct.pack("B", 0xa0 | len(obj)) + obj)
-    elif len(obj) <= 2**8-1:
-        fp.write(b"\xd9" + struct.pack("B", len(obj)) + obj)
-    elif len(obj) <= 2**16-1:
-        fp.write(b"\xda" + struct.pack(">H", len(obj)) + obj)
-    elif len(obj) <= 2**32-1:
-        fp.write(b"\xdb" + struct.pack(">I", len(obj)) + obj)
-    else:
-        raise UnsupportedTypeException("huge string")
-
-def _pack_binary(obj, fp):
-    if len(obj) <= 2**8-1:
-        fp.write(b"\xc4" + struct.pack("B", len(obj)) + obj)
-    elif len(obj) <= 2**16-1:
-        fp.write(b"\xc5" + struct.pack(">H", len(obj)) + obj)
-    elif len(obj) <= 2**32-1:
-        fp.write(b"\xc6" + struct.pack(">I", len(obj)) + obj)
-    else:
-        raise UnsupportedTypeException("huge binary string")
-
-def _pack_oldspec_raw(obj, fp):
-    if len(obj) <= 31:
-        fp.write(struct.pack("B", 0xa0 | len(obj)) + obj)
-    elif len(obj) <= 2**16-1:
-        fp.write(b"\xda" + struct.pack(">H", len(obj)) + obj)
-    elif len(obj) <= 2**32-1:
-        fp.write(b"\xdb" + struct.pack(">I", len(obj)) + obj)
-    else:
-        raise UnsupportedTypeException("huge raw string")
-
-def _pack_ext(obj, fp):
-    if len(obj.data) == 1:
-        fp.write(b"\xd4" + struct.pack("B", obj.type & 0xff) + obj.data)
-    elif len(obj.data) == 2:
-        fp.write(b"\xd5" + struct.pack("B", obj.type & 0xff) + obj.data)
-    elif len(obj.data) == 4:
-        fp.write(b"\xd6" + struct.pack("B", obj.type & 0xff) + obj.data)
-    elif len(obj.data) == 8:
-        fp.write(b"\xd7" + struct.pack("B", obj.type & 0xff) + obj.data)
-    elif len(obj.data) == 16:
-        fp.write(b"\xd8" + struct.pack("B", obj.type & 0xff) + obj.data)
-    elif len(obj.data) <= 2**8-1:
-        fp.write(b"\xc7" + struct.pack("BB", len(obj.data), obj.type & 0xff) + obj.data)
-    elif len(obj.data) <= 2**16-1:
-        fp.write(b"\xc8" + struct.pack(">HB", len(obj.data), obj.type & 0xff) + obj.data)
-    elif len(obj.data) <= 2**32-1:
-        fp.write(b"\xc9" + struct.pack(">IB", len(obj.data), obj.type & 0xff) + obj.data)
-    else:
-        raise UnsupportedTypeException("huge ext data")
-
-def _pack_array(obj, fp):
-    if len(obj) <= 15:
-        fp.write(struct.pack("B", 0x90 | len(obj)))
-    elif len(obj) <= 2**16-1:
-        fp.write(b"\xdc" + struct.pack(">H", len(obj)))
-    elif len(obj) <= 2**32-1:
-        fp.write(b"\xdd" + struct.pack(">I", len(obj)))
-    else:
-        raise UnsupportedTypeException("huge array")
-
-    for e in obj:
-        pack(e, fp)
-
-def _pack_map(obj, fp):
-    if len(obj) <= 15:
-        fp.write(struct.pack("B", 0x80 | len(obj)))
-    elif len(obj) <= 2**16-1:
-        fp.write(b"\xde" + struct.pack(">H", len(obj)))
-    elif len(obj) <= 2**32-1:
-        fp.write(b"\xdf" + struct.pack(">I", len(obj)))
-    else:
-        raise UnsupportedTypeException("huge array")
-
-    for k,v in obj.items():
-        pack(k, fp)
-        pack(v, fp)
-
-########################################
-
-# Pack for Python 2, with 'unicode' type, 'str' type, and 'long' type
-def _pack2(obj, fp):
-    """
-    Serialize a Python object into MessagePack bytes.
-
-    Args:
-        obj: a Python object
-        fp: a .write()-supporting file-like object
-
-    Returns:
-        None.
-
-    Raises:
-        UnsupportedType(PackException):
-            Object type not supported for packing.
-
-    Example:
-    >>> f = open('test.bin', 'w')
-    >>> umsgpack.pack({u"compact": True, u"schema": 0}, f)
-    >>>
-    """
-
-    global compatibility
-
-    if obj is None:
-        _pack_nil(obj, fp)
-    elif isinstance(obj, bool):
-        _pack_boolean(obj, fp)
-    elif isinstance(obj, int) or isinstance(obj, long):
-        _pack_integer(obj, fp)
-    elif isinstance(obj, float):
-        _pack_float(obj, fp)
-    elif compatibility and isinstance(obj, unicode):
-        _pack_oldspec_raw(bytes(obj), fp)
-    elif compatibility and isinstance(obj, bytes):
-        _pack_oldspec_raw(obj, fp)
-    elif isinstance(obj, unicode):
-        _pack_string(obj, fp)
-    elif isinstance(obj, str):
-        _pack_binary(obj, fp)
-    elif isinstance(obj, list) or isinstance(obj, tuple):
-        _pack_array(obj, fp)
-    elif isinstance(obj, dict):
-        _pack_map(obj, fp)
-    elif isinstance(obj, Ext):
-        _pack_ext(obj, fp)
-    else:
-        raise UnsupportedTypeException("unsupported type: %s" % str(type(obj)))
-
-# Pack for Python 3, with unicode 'str' type, 'bytes' type, and no 'long' type
-def _pack3(obj, fp):
-    """
-    Serialize a Python object into MessagePack bytes.
-
-    Args:
-        obj: a Python object
-        fp: a .write()-supporting file-like object
-
-    Returns:
-        None.
-
-    Raises:
-        UnsupportedType(PackException):
-            Object type not supported for packing.
-
-    Example:
-    >>> f = open('test.bin', 'w')
-    >>> umsgpack.pack({u"compact": True, u"schema": 0}, fp)
-    >>>
-    """
-    global compatibility
-
-    if obj is None:
-        _pack_nil(obj, fp)
-    elif isinstance(obj, bool):
-        _pack_boolean(obj, fp)
-    elif isinstance(obj, int):
-        _pack_integer(obj, fp)
-    elif isinstance(obj, float):
-        _pack_float(obj, fp)
-    elif compatibility and isinstance(obj, str):
-        _pack_oldspec_raw(obj.encode('utf-8'), fp)
-    elif compatibility and isinstance(obj, bytes):
-        _pack_oldspec_raw(obj, fp)
-    elif isinstance(obj, str):
-        _pack_string(obj, fp)
-    elif isinstance(obj, bytes):
-        _pack_binary(obj, fp)
-    elif isinstance(obj, list) or isinstance(obj, tuple):
-        _pack_array(obj, fp)
-    elif isinstance(obj, dict):
-        _pack_map(obj, fp)
-    elif isinstance(obj, Ext):
-        _pack_ext(obj, fp)
-    else:
-        raise UnsupportedTypeException("unsupported type: %s" % str(type(obj)))
-
-def _packb2(obj):
-    """
-    Serialize a Python object into MessagePack bytes.
-
-    Args:
-        obj: a Python object
-
-    Returns:
-        A 'str' containing serialized MessagePack bytes.
-
-    Raises:
-        UnsupportedType(PackException):
-            Object type not supported for packing.
-
-    Example:
-    >>> umsgpack.packb({u"compact": True, u"schema": 0})
-    '\x82\xa7compact\xc3\xa6schema\x00'
-    >>>
-    """
-    fp = io.BytesIO()
-    _pack2(obj, fp)
-    return fp.getvalue()
-
-def _packb3(obj):
-    """
-    Serialize a Python object into MessagePack bytes.
-
-    Args:
-        obj: a Python object
-
-    Returns:
-        A 'bytes' containing serialized MessagePack bytes.
-
-    Raises:
-        UnsupportedType(PackException):
-            Object type not supported for packing.
-
-    Example:
-    >>> umsgpack.packb({u"compact": True, u"schema": 0})
-    b'\x82\xa7compact\xc3\xa6schema\x00'
-    >>>
-    """
-    fp = io.BytesIO()
-    _pack3(obj, fp)
-    return fp.getvalue()
-
-################################################################################
-### Unpacking
-################################################################################
-
-def _read_except(fp, n):
-    data = fp.read(n)
-    if len(data) < n:
-        raise InsufficientDataException()
-    return data
-
-def _unpack_integer(code, fp):
-    if (ord(code) & 0xe0) == 0xe0:
-        return struct.unpack("b", code)[0]
-    elif code == b'\xd0':
-        return struct.unpack("b", _read_except(fp, 1))[0]
-    elif code == b'\xd1':
-        return struct.unpack(">h", _read_except(fp, 2))[0]
-    elif code == b'\xd2':
-        return struct.unpack(">i", _read_except(fp, 4))[0]
-    elif code == b'\xd3':
-        return struct.unpack(">q", _read_except(fp, 8))[0]
-    elif (ord(code) & 0x80) == 0x00:
-        return struct.unpack("B", code)[0]
-    elif code == b'\xcc':
-        return struct.unpack("B", _read_except(fp, 1))[0]
-    elif code == b'\xcd':
-        return struct.unpack(">H", _read_except(fp, 2))[0]
-    elif code == b'\xce':
-        return struct.unpack(">I", _read_except(fp, 4))[0]
-    elif code == b'\xcf':
-        return struct.unpack(">Q", _read_except(fp, 8))[0]
-    raise Exception("logic error, not int: 0x%02x" % ord(code))
-
-def _unpack_reserved(code, fp):
-    if code == b'\xc1':
-        raise ReservedCodeException("encountered reserved code: 0x%02x" % ord(code))
-    raise Exception("logic error, not reserved code: 0x%02x" % ord(code))
-
-def _unpack_nil(code, fp):
-    if code == b'\xc0':
-        return None
-    raise Exception("logic error, not nil: 0x%02x" % ord(code))
-
-def _unpack_boolean(code, fp):
-    if code == b'\xc2':
-        return False
-    elif code == b'\xc3':
-        return True
-    raise Exception("logic error, not boolean: 0x%02x" % ord(code))
-
-def _unpack_float(code, fp):
-    if code == b'\xca':
-        return struct.unpack(">f", _read_except(fp, 4))[0]
-    elif code == b'\xcb':
-        return struct.unpack(">d", _read_except(fp, 8))[0]
-    raise Exception("logic error, not float: 0x%02x" % ord(code))
-
-def _unpack_string(code, fp):
-    if (ord(code) & 0xe0) == 0xa0:
-        length = ord(code) & ~0xe0
-    elif code == b'\xd9':
-        length = struct.unpack("B", _read_except(fp, 1))[0]
-    elif code == b'\xda':
-        length = struct.unpack(">H", _read_except(fp, 2))[0]
-    elif code == b'\xdb':
-        length = struct.unpack(">I", _read_except(fp, 4))[0]
-    else:
-        raise Exception("logic error, not string: 0x%02x" % ord(code))
-
-    # Always return raw bytes in compatibility mode
-    global compatibility
-    if compatibility:
-        return _read_except(fp, length)
-
-    try:
-        return bytes.decode(_read_except(fp, length), 'utf-8')
-    except UnicodeDecodeError:
-        raise InvalidStringException("unpacked string is not utf-8")
-
-def _unpack_binary(code, fp):
-    if code == b'\xc4':
-        length = struct.unpack("B", _read_except(fp, 1))[0]
-    elif code == b'\xc5':
-        length = struct.unpack(">H", _read_except(fp, 2))[0]
-    elif code == b'\xc6':
-        length = struct.unpack(">I", _read_except(fp, 4))[0]
-    else:
-        raise Exception("logic error, not binary: 0x%02x" % ord(code))
-
-    return _read_except(fp, length)
-
-def _unpack_ext(code, fp):
-    if code == b'\xd4':
-        length = 1
-    elif code == b'\xd5':
-        length = 2
-    elif code == b'\xd6':
-        length = 4
-    elif code == b'\xd7':
-        length = 8
-    elif code == b'\xd8':
-        length = 16
-    elif code == b'\xc7':
-        length = struct.unpack("B", _read_except(fp, 1))[0]
-    elif code == b'\xc8':
-        length = struct.unpack(">H", _read_except(fp, 2))[0]
-    elif code == b'\xc9':
-        length = struct.unpack(">I", _read_except(fp, 4))[0]
-    else:
-        raise Exception("logic error, not ext: 0x%02x" % ord(code))
-
-    return Ext(ord(_read_except(fp, 1)), _read_except(fp, length))
-
-def _unpack_array(code, fp):
-    if (ord(code) & 0xf0) == 0x90:
-        length = (ord(code) & ~0xf0)
-    elif code == b'\xdc':
-        length = struct.unpack(">H", _read_except(fp, 2))[0]
-    elif code == b'\xdd':
-        length = struct.unpack(">I", _read_except(fp, 4))[0]
-    else:
-        raise Exception("logic error, not array: 0x%02x" % ord(code))
-
-    return [_unpack(fp) for i in range(length)]
-
-def _deep_list_to_tuple(obj):
-    if isinstance(obj, list):
-        return tuple([_deep_list_to_tuple(e) for e in obj])
-    return obj
-
-def _unpack_map(code, fp):
-    if (ord(code) & 0xf0) == 0x80:
-        length = (ord(code) & ~0xf0)
-    elif code == b'\xde':
-        length = struct.unpack(">H", _read_except(fp, 2))[0]
-    elif code == b'\xdf':
-        length = struct.unpack(">I", _read_except(fp, 4))[0]
-    else:
-        raise Exception("logic error, not map: 0x%02x" % ord(code))
-
-    d = {}
-    for i in range(length):
-        # Unpack key
-        k = _unpack(fp)
-
-        if isinstance(k, list):
-            # Attempt to convert list into a hashable tuple
-            k = _deep_list_to_tuple(k)
-        elif not isinstance(k, collections.Hashable):
-            raise UnhashableKeyException("encountered unhashable key: %s, %s" % (str(k), str(type(k))))
-        elif k in d:
-            raise DuplicateKeyException("encountered duplicate key: %s, %s" % (str(k), str(type(k))))
-
-        # Unpack value
-        v = _unpack(fp)
-
-        try:
-            d[k] = v
-        except TypeError:
-            raise UnhashableKeyException("encountered unhashable key: %s" % str(k))
-    return d
-
-def _unpack(fp):
-    code = _read_except(fp, 1)
-    return _unpack_dispatch_table[code](code, fp)
-
-########################################
-
-def _unpack2(fp):
-    """
-    Deserialize MessagePack bytes into a Python object.
-
-    Args:
-        fp: a .read()-supporting file-like object
-
-    Returns:
-        A Python object.
-
-    Raises:
-        InsufficientDataException(UnpackException):
-            Insufficient data to unpack the encoded object.
-        InvalidStringException(UnpackException):
-            Invalid UTF-8 string encountered during unpacking.
-        ReservedCodeException(UnpackException):
-            Reserved code encountered during unpacking.
-        UnhashableKeyException(UnpackException):
-            Unhashable key encountered during map unpacking.
-            The serialized map cannot be deserialized into a Python dictionary.
-        DuplicateKeyException(UnpackException):
-            Duplicate key encountered during map unpacking.
-
-    Example:
-    >>> f = open("test.bin")
-    >>> umsgpack.unpackb(f)
-    {u'compact': True, u'schema': 0}
-    >>>
-    """
-    return _unpack(fp)
-
-def _unpack3(fp):
-    """
-    Deserialize MessagePack bytes into a Python object.
-
-    Args:
-        fp: a .read()-supporting file-like object
-
-    Returns:
-        A Python object.
-
-    Raises:
-        InsufficientDataException(UnpackException):
-            Insufficient data to unpack the encoded object.
-        InvalidStringException(UnpackException):
-            Invalid UTF-8 string encountered during unpacking.
-        ReservedCodeException(UnpackException):
-            Reserved code encountered during unpacking.
-        UnhashableKeyException(UnpackException):
-            Unhashable key encountered during map unpacking.
-            The serialized map cannot be deserialized into a Python dictionary.
-        DuplicateKeyException(UnpackException):
-            Duplicate key encountered during map unpacking.
-
-    Example:
-    >>> f = open("test.bin")
-    >>> umsgpack.unpackb(f)
-    {'compact': True, 'schema': 0}
-    >>>
-    """
-    return _unpack(fp)
-
-# For Python 2, expects a str object
-def _unpackb2(s):
-    """
-    Deserialize MessagePack bytes into a Python object.
-
-    Args:
-        s: a 'str' containing serialized MessagePack bytes
-
-    Returns:
-        A Python object.
-
-    Raises:
-        TypeError:
-            Packed data is not type 'str'.
-        InsufficientDataException(UnpackException):
-            Insufficient data to unpack the encoded object.
-        InvalidStringException(UnpackException):
-            Invalid UTF-8 string encountered during unpacking.
-        ReservedCodeException(UnpackException):
-            Reserved code encountered during unpacking.
-        UnhashableKeyException(UnpackException):
-            Unhashable key encountered during map unpacking.
-            The serialized map cannot be deserialized into a Python dictionary.
-        DuplicateKeyException(UnpackException):
-            Duplicate key encountered during map unpacking.
-
-    Example:
-    >>> umsgpack.unpackb(b'\x82\xa7compact\xc3\xa6schema\x00')
-    {u'compact': True, u'schema': 0}
-    >>>
-    """
-    if not isinstance(s, str):
-        raise TypeError("packed data is not type 'str'")
-    return _unpack(io.BytesIO(s))
-
-# For Python 3, expects a bytes object
-def _unpackb3(s):
-    """
-    Deserialize MessagePack bytes into a Python object.
-
-    Args:
-        s: a 'bytes' containing serialized MessagePack bytes
-
-    Returns:
-        A Python object.
-
-    Raises:
-        TypeError:
-            Packed data is not type 'bytes'.
-        InsufficientDataException(UnpackException):
-            Insufficient data to unpack the encoded object.
-        InvalidStringException(UnpackException):
-            Invalid UTF-8 string encountered during unpacking.
-        ReservedCodeException(UnpackException):
-            Reserved code encountered during unpacking.
-        UnhashableKeyException(UnpackException):
-            Unhashable key encountered during map unpacking.
-            The serialized map cannot be deserialized into a Python dictionary.
-        DuplicateKeyException(UnpackException):
-            Duplicate key encountered during map unpacking.
-
-    Example:
-    >>> umsgpack.unpackb(b'\x82\xa7compact\xc3\xa6schema\x00')
-    {'compact': True, 'schema': 0}
-    >>>
-    """
-    if not isinstance(s, bytes):
-        raise TypeError("packed data is not type 'bytes'")
-    return _unpack(io.BytesIO(s))
-
-################################################################################
-### Module Initialization
-################################################################################
-
-def __init():
-    global pack
-    global packb
-    global unpack
-    global unpackb
-    global dump
-    global dumps
-    global load
-    global loads
-    global compatibility
-    global _float_size
-    global _unpack_dispatch_table
-
-    # Compatibility mode for handling strings/bytes with the old specification
-    compatibility = False
-
-    # Auto-detect system float precision
-    if sys.float_info.mant_dig == 53:
-        _float_size = 64
-    else:
-        _float_size = 32
-
-    # Map packb and unpackb to the appropriate version
-    if sys.version_info[0] == 3:
-        pack = _pack3
-        packb = _packb3
-        dump = _pack3
-        dumps = _packb3
-        unpack = _unpack3
-        unpackb = _unpackb3
-        load = _unpack3
-        loads = _unpackb3
-    else:
-        pack = _pack2
-        packb = _packb2
-        dump = _pack2
-        dumps = _packb2
-        unpack = _unpack2
-        unpackb = _unpackb2
-        load = _unpack2
-        loads = _unpackb2
-
-    # Build a dispatch table for fast lookup of unpacking function
-
-    _unpack_dispatch_table = {}
-    # Fix uint
-    for code in range(0, 0x7f+1):
-        _unpack_dispatch_table[struct.pack("B", code)] = _unpack_integer
-    # Fix map
-    for code in range(0x80, 0x8f+1):
-        _unpack_dispatch_table[struct.pack("B", code)] = _unpack_map
-    # Fix array
-    for code in range(0x90, 0x9f+1):
-        _unpack_dispatch_table[struct.pack("B", code)] = _unpack_array
-    # Fix str
-    for code in range(0xa0, 0xbf+1):
-        _unpack_dispatch_table[struct.pack("B", code)] = _unpack_string
-    # Nil
-    _unpack_dispatch_table[b'\xc0'] = _unpack_nil
-    # Reserved
-    _unpack_dispatch_table[b'\xc1'] = _unpack_reserved
-    # Boolean
-    _unpack_dispatch_table[b'\xc2'] = _unpack_boolean
-    _unpack_dispatch_table[b'\xc3'] = _unpack_boolean
-    # Bin
-    for code in range(0xc4, 0xc6+1):
-        _unpack_dispatch_table[struct.pack("B", code)] = _unpack_binary
-    # Ext
-    for code in range(0xc7, 0xc9+1):
-        _unpack_dispatch_table[struct.pack("B", code)] = _unpack_ext
-    # Float
-    _unpack_dispatch_table[b'\xca'] = _unpack_float
-    _unpack_dispatch_table[b'\xcb'] = _unpack_float
-    # Uint
-    for code in range(0xcc, 0xcf+1):
-        _unpack_dispatch_table[struct.pack("B", code)] = _unpack_integer
-    # Int
-    for code in range(0xd0, 0xd3+1):
-        _unpack_dispatch_table[struct.pack("B", code)] = _unpack_integer
-    # Fixext
-    for code in range(0xd4, 0xd8+1):
-        _unpack_dispatch_table[struct.pack("B", code)] = _unpack_ext
-    # String
-    for code in range(0xd9, 0xdb+1):
-        _unpack_dispatch_table[struct.pack("B", code)] = _unpack_string
-    # Array
-    _unpack_dispatch_table[b'\xdc'] = _unpack_array
-    _unpack_dispatch_table[b'\xdd'] = _unpack_array
-    # Map
-    _unpack_dispatch_table[b'\xde'] = _unpack_map
-    _unpack_dispatch_table[b'\xdf'] = _unpack_map
-    # Negative fixint
-    for code in range(0xe0, 0xff+1):
-        _unpack_dispatch_table[struct.pack("B", code)] = _unpack_integer
-
-__init()
diff --git a/wally/sensors_utils.py b/wally/sensors_utils.py
index e44704d..7c8cb98 100644
--- a/wally/sensors_utils.py
+++ b/wally/sensors_utils.py
@@ -14,7 +14,7 @@
 logger = logging.getLogger("wally")
 
 
-def save_sensors_data(data_q, mon_q, fd):
+def save_sensors_data(data_q, mon_q, fd, sensors_configs):
     fd.write("\n")
 
     observed_nodes = set()
@@ -26,11 +26,13 @@
                 break
 
             addr, data = val
+
             if addr not in observed_nodes:
                 mon_q.put(addr + (data['source_id'],))
                 observed_nodes.add(addr)
 
-            fd.write("{0!s} : {1!r}\n".format(time.time(), repr(val)))
+            fd.write("{0!s} : {1!r}\n".format(time.time(),
+                     repr((addr, data))))
     except Exception:
         logger.exception("Error in sensors thread")
     logger.info("Sensors thread exits")
@@ -80,7 +82,8 @@
     mon_q = Queue.Queue()
     fd = open(cfg_dict['sensor_storage'], "w")
     sensor_listen_th = threading.Thread(None, save_sensors_data, None,
-                                        (sensors_data_q, mon_q, fd))
+                                        (sensors_data_q, mon_q, fd,
+                                         sensors_configs))
     sensor_listen_th.daemon = True
     sensor_listen_th.start()