fixes, fixes, fixes
diff --git a/wally/sensors/api.py b/wally/sensors/api.py
index ea73789..e8c6261 100644
--- a/wally/sensors/api.py
+++ b/wally/sensors/api.py
@@ -4,7 +4,7 @@
from .deploy_sensors import (deploy_and_start_sensors,
stop_and_remove_sensors)
-from .protocol import create_protocol, Timeout
+from .protocol import create_protocol, Timeout, CantUnpack
__all__ = ['Empty', 'recv_main',
@@ -32,9 +32,15 @@
def recv_main(proto, data_q, cmd_q):
while True:
try:
- data_q.put(proto.recv(0.1))
+ ip, packet = proto.recv(0.1)
+ if packet is not None:
+ data_q.put((ip, packet))
+ except AssertionError as exc:
+ logger.warning("Error in sensor data " + str(exc))
except Timeout:
pass
+ except CantUnpack as exc:
+ print exc
try:
val = cmd_q.get(False)
diff --git a/wally/sensors/protocol.py b/wally/sensors/protocol.py
index 9fd1a84..fad7e00 100644
--- a/wally/sensors/protocol.py
+++ b/wally/sensors/protocol.py
@@ -6,13 +6,15 @@
import cPickle as pickle
from urlparse import urlparse
-from . import cp_transport
-
class Timeout(Exception):
pass
+class CantUnpack(Exception):
+ pass
+
+
# ------------------------------------- Serializers --------------------------
@@ -24,87 +26,114 @@
pass
-class StructSerializer(ISensortResultsSerializer):
- class LocalConfig(object):
- def __init__(self):
- self.last_format_sent = -1
- self.initial_sent = False
- self.initial_times = 5
- self.field_order = None
+class StructSerializerSend(ISensortResultsSerializer):
+ initial_times = 5
+ resend_timeout = 60
+ HEADERS = 'h'
+ DATA = 'd'
+ END_OF_HEADERS = '\x00'
+ END_OF_SOURCE_ID = '\x00'
+ HEADERS_SEPARATOR = '\n'
def __init__(self):
- self.configs = {}
+ self.field_order = None
+ self.headers_send_cycles_left = self.initial_times
+ self.pack_fmt = None
+ self.next_header_send_time = None
def pack(self, data):
- OLD_FORMAT = 5
- source_id = data["source_id"]
- config = self.configs.setdefault(source_id,
- StructSerializer.LocalConfig())
+ data = data.copy()
- if config.field_order is None or \
- not config.initial_sent or \
- time.time() - config.last_format_sent > OLD_FORMAT:
- # send|resend format
- field_order = sorted(data.keys())
+ source_id = data.pop("source_id")
+ vals = [int(data.pop("time").value)]
- config.field_order = field_order
- config.last_format_sent = time.time()
- if not config.initial_sent:
- config.initial_times -= 1
- config.initial_sent = (config.initial_times <= 0)
+ if self.field_order is None:
+ self.field_order = sorted(data.keys())
+ self.pack_fmt = "!I" + "I" * len(self.field_order)
- forder = "\n".join(field_order)
- flen = struct.pack("!H", len(field_order))
- return "\x00{0}\x00{1}{2}".format(source_id, flen, forder)
+ need_resend = False
+ if self.next_header_send_time is not None:
+ if time.time() > self.next_header_send_time:
+ need_resend = True
+
+ if self.headers_send_cycles_left > 0 or need_resend:
+ forder = self.HEADERS_SEPARATOR.join(self.field_order)
+ flen = struct.pack("!H", len(self.field_order))
+
+ result = (self.HEADERS + source_id +
+ self.END_OF_SOURCE_ID +
+ flen + forder + self.END_OF_HEADERS)
+
+ if self.headers_send_cycles_left > 0:
+ self.headers_send_cycles_left -= 1
+
+ self.next_header_send_time = time.time() + self.resend_timeout
else:
- # send data
- # time will be first after source_id
- vals = [data["time"]]
- for name in config.field_order:
- if name in data:
- vals.append(data[name])
- pack_fmt = "!" + ("I" * len(vals))
- packed_data = struct.pack(pack_fmt, vals)
- return "\x01{0}\x00{1}".format(source_id, packed_data)
+ result = ""
+
+ for name in self.field_order:
+ vals.append(int(data[name].value))
+
+ packed_data = self.DATA + source_id
+ packed_data += self.END_OF_SOURCE_ID
+ packed_data += struct.pack(self.pack_fmt, *vals)
+
+ return result + packed_data
+
+
+class StructSerializerRecv(ISensortResultsSerializer):
+ def __init__(self):
+ self.fields = {}
+ self.formats = {}
def unpack(self, data):
code = data[0]
- data = data[1:]
- source_id, _, packed_data = data.partition("\x00")
- config = self.configs.setdefault(source_id,
- StructSerializer.LocalConfig())
- unpacked_data = {"source_id":source_id}
+ source_id, _, packed_data = data[1:].partition(
+ StructSerializerSend.END_OF_SOURCE_ID)
- if code == "\x00":
+ if code == StructSerializerSend.HEADERS:
# fields order provided
- flen = struct.unpack("!H", packed_data[:2])
- forder = packed_data[2:].split("\n")
- if len(forder) != flen:
- return unpacked_data
- config.field_order = forder
- return unpacked_data
+ flen_sz = struct.calcsize("!H")
+ flen = struct.unpack("!H", packed_data[:flen_sz])[0]
+ headers_data, rest = packed_data[flen_sz:].split(
+ StructSerializerSend.END_OF_HEADERS, 1)
+
+ forder = headers_data.split(
+ StructSerializerSend.HEADERS_SEPARATOR)
+
+ assert len(forder) == flen, \
+ "Wrong len {0} != {1}".format(len(forder), flen)
+
+ if 'source_id' in self.fields:
+ assert self.fields[source_id] == ['time'] + forder,\
+ "New field order"
+ else:
+ self.fields[source_id] = ['time'] + forder
+ self.formats[source_id] = "!I" + "I" * flen
+
+ if len(rest) != 0:
+ return self.unpack(rest)
+ return None
else:
- # data provided
- # try to find fields_order
- if config.field_order is None:
- raise ValueError("No fields order provided"
- " for {0}, cannot unpack".format(source_id))
+ assert code == StructSerializerSend.DATA,\
+ "Unknown code {0!r}".format(code)
- val_size = 4
- if len(packed_data) % val_size != 0:
- raise ValueError("Bad packet received"
- " from {0}, cannot unpack".format(source_id))
- datalen = len(packed_data) / val_size
- pack_fmt = "!" + ("I" * datalen)
- vals = struct.unpack(pack_fmt, packed_data)
+ try:
+ fields = self.fields[source_id]
+ except KeyError:
+ raise CantUnpack("No fields order provided"
+ " for {0} yet".format(source_id))
+ s_format = self.formats[source_id]
- unpacked_data['time'] = vals[0]
- i = 1
- for field in config.field_order:
- data[field] = vals[i]
- i += 1
- return data
+ exp_size = struct.calcsize(s_format)
+ assert len(packed_data) == exp_size, \
+ "Wrong data len {0} != {1}".format(len(packed_data), exp_size)
+
+ vals = struct.unpack(s_format, packed_data)
+ res = dict(zip(fields, vals))
+ res['source_id'] = source_id
+ return res
class PickleSerializer(ISensortResultsSerializer):
@@ -215,8 +244,14 @@
return StdoutTransport(receiver)
elif parsed_uri.scheme == 'udp':
ip, port = parsed_uri.netloc.split(":")
+
+ if receiver:
+ packer_cls = StructSerializerRecv
+ else:
+ packer_cls = StructSerializerSend
+
return UDPTransport(receiver, ip=ip, port=int(port),
- packer_cls=StructSerializer)
+ packer_cls=packer_cls)
elif parsed_uri.scheme == 'file':
return FileTransport(receiver, parsed_uri.path)
else: