improvement of protocol
refactoring
send in field order
receiver unpacking
refactoring of packer code
fields send, pack replacement
diff --git a/wally/sensors/protocol.py b/wally/sensors/protocol.py
index 7688f31..d157d98 100644
--- a/wally/sensors/protocol.py
+++ b/wally/sensors/protocol.py
@@ -1,5 +1,6 @@
import sys
import time
+import struct
import socket
import select
import cPickle as pickle
@@ -23,6 +24,89 @@
pass
+class StructSerializer(ISensortResultsSerializer):
+ class LocalConfig(object):
+ def __init__(self):
+ self.last_format_sent = -1
+ self.initial_sent = False
+ self.initial_times = 5
+ self.field_order = None
+
+ def __init__(self):
+ self.configs = {}
+
+ def pack(self, data):
+ OLD_FORMAT = 5
+ source_id = data["source_id"]
+ config = self.configs.setdefault(source_id,
+ StructSerializer.LocalConfig())
+
+ if config.field_order is None or \
+ not config.initial_sent or \
+ time.time() - config.last_format_sent > OLD_FORMAT:
+ # send|resend format
+ field_order = sorted(data.keys())
+
+ config.field_order = field_order
+ config.last_format_sent = time.time()
+ if not config.initial_sent:
+ config.initial_times -= 1
+ config.initial_sent = (config.initial_times <= 0)
+
+ forder = "\n".join(field_order)
+ flen = struct.pack("!H", len(field_order))
+ return "\x00{0}\x00{1}{2}".format(source_id, flen, forder)
+ else:
+ # send data
+ # time will be first after source_id
+ vals = [data["time"]]
+ for name in config.field_order:
+ if name in data:
+ vals.append(data[name])
+ pack_fmt = "!" + ("I" * len(vals))
+ packed_data = struct.pack(pack_fmt, vals)
+ return "\x01{0}\x00{1}".format(source_id, packed_data)
+
+ def unpack(self, data):
+ code = data[0]
+ data = data[1:]
+ source_id, _, packed_data = data.partition("\x00")
+ config = self.configs.setdefault(source_id,
+ StructSerializer.LocalConfig())
+ unpacked_data = {"source_id":source_id}
+
+ if code == "\x00":
+ # fields order provided
+ flen = struct.unpack("!H", packed_data[:2])
+ forder = packed_data[2:].split("\n")
+ if len(forder) != flen:
+ return unpacked_data
+ config.field_order = forder
+ return unpacked_data
+
+ else:
+ # data provided
+ # try to find fields_order
+ if config.field_order is None:
+ raise ValueError("No fields order provided"
+ " for {0}, cannot unpack".format(source_id))
+
+ val_size = 4
+ if len(packed_data) % val_size != 0:
+ raise ValueError("Bad packet received"
+ " from {0}, cannot unpack".format(source_id))
+ datalen = len(packed_data) / val_size
+ pack_fmt = "!" + ("I" * datalen)
+ vals = struct.unpack(pack_fmt, packed_data)
+
+ unpacked_data['time'] = vals[0]
+ i = 1
+ for field in config.field_order:
+ data[field] = vals[i]
+ i += 1
+ return data
+
+
class PickleSerializer(ISensortResultsSerializer):
def pack(self, data):
ndata = {}
@@ -36,31 +120,6 @@
def unpack(self, data):
return pickle.loads(data)
-try:
- # try to use full-function lib
- import msgpack
-
- class mgspackSerializer(ISensortResultsSerializer):
- def pack(self, data):
- return msgpack.packb(data)
-
- def unpack(self, data):
- return msgpack.unpackb(data)
-
- MSGPackSerializer = mgspackSerializer
-except ImportError:
- # use local lib, if failed import
- import umsgpack
-
- class umsgspackSerializer(ISensortResultsSerializer):
- def pack(self, data):
- return umsgpack.packb(data)
-
- def unpack(self, data):
- return umsgpack.unpackb(data)
-
- MSGPackSerializer = umsgspackSerializer
-
# ------------------------------------- Transports ---------------------------
@@ -147,34 +206,6 @@
raise Timeout()
-class HugeUDPTransport(ITransport, cp_transport.Sender):
- def __init__(self, receiver, ip, port, packer_cls):
- cp_transport.Sender.__init__(self, port=port, host=ip)
- if receiver:
- self.bind()
-
- def send(self, data):
- self.send_by_protocol(data)
-
- def recv(self, timeout=None):
- begin = time.time()
-
- while True:
-
- try:
- # return not None, if packet is ready
- ready = self.recv_by_protocol()
- # if data ready - return it
- if ready is not None:
- return ready
- # if data not ready - check if it's time to die
- if time.time() - begin >= timeout:
- break
-
- except cp_transport.Timeout:
- # no answer yet - check, if timeout end
- if time.time() - begin >= timeout:
- break
# -------------------------- Factory function --------------------------------
@@ -185,13 +216,9 @@
elif parsed_uri.scheme == 'udp':
ip, port = parsed_uri.netloc.split(":")
return UDPTransport(receiver, ip=ip, port=int(port),
- packer_cls=PickleSerializer)
+ packer_cls=StructSerializer)
elif parsed_uri.scheme == 'file':
return FileTransport(receiver, parsed_uri.path)
- elif parsed_uri.scheme == 'hugeudp':
- ip, port = parsed_uri.netloc.split(":")
- return HugeUDPTransport(receiver, ip=ip, port=int(port),
- packer_cls=MSGPackSerializer)
else:
templ = "Can't instantiate transport from {0!r}"
raise ValueError(templ.format(uri))