THRIFT-4187 Allow dart framed transport to read incomplete frame
Client: dart

This closes #1269
diff --git a/lib/dart/lib/src/transport/t_framed_transport.dart b/lib/dart/lib/src/transport/t_framed_transport.dart
index 80ccf2c..2ef03f7 100644
--- a/lib/dart/lib/src/transport/t_framed_transport.dart
+++ b/lib/dart/lib/src/transport/t_framed_transport.dart
@@ -25,7 +25,14 @@
 
   final TTransport _transport;
 
-  final Uint8List headerBytes = new Uint8List(headerByteCount);
+  final Uint8List _headerBytes = new Uint8List(headerByteCount);
+  int _receivedHeaderBytes = 0;
+
+  int _bodySize = 0;
+  Uint8List _body = null;
+  int _receivedBodyBytes = 0;
+
+  Completer<Uint8List> _frameCompleter = null;
 
   TFramedTransport(TTransport transport) : _transport = transport {
     if (transport == null) {
@@ -51,33 +58,112 @@
       if (got > 0) return got;
     }
 
-    _readFrame();
+    // IMPORTANT: by the time you've got here,
+    // an entire frame is available for reading
 
     return super.read(buffer, offset, length);
   }
 
   void _readFrame() {
-    _transport.readAll(headerBytes, 0, headerByteCount);
-    int size = headerBytes.buffer.asByteData().getUint32(0);
-
-    if (size < 0) {
-      throw new TTransportError(
-          TTransportErrorType.UNKNOWN, "Read a negative frame size: $size");
+    if (_body == null) {
+      bool gotFullHeader = _readFrameHeader();
+      if (!gotFullHeader) {
+        return;
+      }
     }
 
-    Uint8List buffer = new Uint8List(size);
-    _transport.readAll(buffer, 0, size);
-    _setReadBuffer(buffer);
+    _readFrameBody();
+  }
+
+  bool _readFrameHeader() {
+    var remainingHeaderBytes = headerByteCount - _receivedHeaderBytes;
+
+    int got = _transport.read(_headerBytes, _receivedHeaderBytes, remainingHeaderBytes);
+    if (got < 0) {
+      throw new TTransportError(
+          TTransportErrorType.UNKNOWN, "Socket closed during frame header read");
+    }
+
+    _receivedHeaderBytes += got;
+
+    if (_receivedHeaderBytes == headerByteCount) {
+      int size = _headerBytes.buffer.asByteData().getUint32(0);
+
+      _receivedHeaderBytes = 0;
+
+      if (size < 0) {
+        throw new TTransportError(
+            TTransportErrorType.UNKNOWN, "Read a negative frame size: $size");
+      }
+
+      _bodySize = size;
+      _body = new Uint8List(_bodySize);
+      _receivedBodyBytes = 0;
+
+      return true;
+    } else {
+      _registerForReadableBytes();
+      return false;
+    }
+  }
+
+  void _readFrameBody() {
+    var remainingBodyBytes = _bodySize - _receivedBodyBytes;
+
+    int got = _transport.read(_body, _receivedBodyBytes, remainingBodyBytes);
+    if (got < 0) {
+      throw new TTransportError(
+          TTransportErrorType.UNKNOWN, "Socket closed during frame body read");
+    }
+
+    _receivedBodyBytes += got;
+
+    if (_receivedBodyBytes == _bodySize) {
+      var body = _body;
+
+      _bodySize = 0;
+      _body = null;
+      _receivedBodyBytes = 0;
+
+      _setReadBuffer(body);
+
+      var completer = _frameCompleter;
+      _frameCompleter = null;
+      completer.complete(new Uint8List(0));
+    } else {
+      _registerForReadableBytes();
+    }
   }
 
   Future flush() {
-    Uint8List buffer = consumeWriteBuffer();
-    int length = buffer.length;
+    if (_frameCompleter == null) {
+      Uint8List buffer = consumeWriteBuffer();
+      int length = buffer.length;
 
-    headerBytes.buffer.asByteData().setUint32(0, length);
-    _transport.write(headerBytes, 0, headerByteCount);
-    _transport.write(buffer, 0, length);
+      _headerBytes.buffer.asByteData().setUint32(0, length);
+      _transport.write(_headerBytes, 0, headerByteCount);
+      _transport.write(buffer, 0, length);
 
-    return _transport.flush();
+      _frameCompleter  = new Completer<Uint8List>();
+      _registerForReadableBytes();
+    }
+
+    return _frameCompleter.future;
+  }
+
+  void _registerForReadableBytes() {
+    _transport.flush().then((_) {
+      _readFrame();
+    }).catchError((e) {
+      var completer = _frameCompleter;
+
+      _receivedHeaderBytes = 0;
+      _bodySize = 0;
+      _body = null;
+      _receivedBodyBytes = 0;
+      _frameCompleter = null;
+
+      completer.completeError(e);
+    });
   }
 }
diff --git a/lib/dart/lib/src/transport/t_socket_transport.dart b/lib/dart/lib/src/transport/t_socket_transport.dart
index 8dcdfde..c41374a 100644
--- a/lib/dart/lib/src/transport/t_socket_transport.dart
+++ b/lib/dart/lib/src/transport/t_socket_transport.dart
@@ -79,7 +79,9 @@
     var completer = new Completer<Uint8List>.sync();
     _completers.add(completer);
 
-    socket.send(bytes);
+    if (bytes.lengthInBytes > 0) {
+      socket.send(bytes);
+    }
 
     return completer.future;
   }
diff --git a/lib/dart/test/transport/t_framed_transport_test.dart b/lib/dart/test/transport/t_framed_transport_test.dart
new file mode 100644
index 0000000..e072e68
--- /dev/null
+++ b/lib/dart/test/transport/t_framed_transport_test.dart
@@ -0,0 +1,175 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+library thrift.test.transport.t_framed_transport_test;
+
+import 'dart:async';
+import 'dart:convert';
+import 'dart:typed_data' show Uint8List;
+
+import 'package:test/test.dart';
+import 'package:thrift/thrift.dart';
+
+void main() {
+  group('TFramedTransport partial reads', () {
+    final flushAwaitDuration = new Duration(seconds: 10);
+
+    FakeReadOnlySocket socket;
+    TSocketTransport socketTransport;
+    TFramedTransport transport;
+    var messageAvailable;
+
+    setUp(() {
+      socket = new FakeReadOnlySocket();
+      socketTransport = new TClientSocketTransport(socket);
+      transport = new TFramedTransport(socketTransport);
+      messageAvailable = false;
+    });
+
+    expectNoReadableBytes() {
+      var readBuffer = new Uint8List(128);
+      var readBytes = transport.read(readBuffer, 0, readBuffer.lengthInBytes);
+      expect(readBytes, 0);
+      expect(messageAvailable, false);
+    }
+
+    test('Test transport reads messages where header and body are sent separately', () async {
+      // buffer into which we'll read
+      var readBuffer = new Uint8List(10);
+      var readBytes;
+
+      // registers for readable bytes
+      var flushFuture = transport.flush().timeout(flushAwaitDuration);
+      flushFuture.then((_) {
+        messageAvailable = true;
+      });
+
+      // write header bytes
+      socket.messageController.add(new Uint8List.fromList([0x00, 0x00, 0x00, 0x06]));
+
+      // you shouldn't be able to get any bytes from the read,
+      // because the header has been consumed internally
+      expectNoReadableBytes();
+
+      // write first batch of body
+      socket.messageController.add(new Uint8List.fromList(UTF8.encode("He")));
+
+      // you shouldn't be able to get any bytes from the read,
+      // because the frame has been consumed internally
+      expectNoReadableBytes();
+
+      // write second batch of body
+      socket.messageController.add(new Uint8List.fromList(UTF8.encode("llo!")));
+
+      // have to wait for the flush to complete,
+      // because it's only then that the frame is available for reading
+      await flushFuture;
+      expect(messageAvailable, true);
+
+      // at this point the frame is complete, so we expect the read to complete
+      readBytes = transport.read(readBuffer, 0, readBuffer.lengthInBytes);
+      expect(readBytes, 6);
+      expect(readBuffer.sublist(0, 6), UTF8.encode("Hello!"));
+    });
+
+    test('Test transport reads messages where header is sent in pieces '
+         'and body is also sent in pieces', () async {
+      // buffer into which we'll read
+      var readBuffer = new Uint8List(10);
+      var readBytes;
+
+      // registers for readable bytes
+      var flushFuture = transport.flush().timeout(flushAwaitDuration);
+      flushFuture.then((_) {
+        messageAvailable = true;
+      });
+
+      // write first part of header bytes
+      socket.messageController.add(new Uint8List.fromList([0x00, 0x00]));
+
+      // you shouldn't be able to get any bytes from the read
+      expectNoReadableBytes();
+
+      // write second part of header bytes
+      socket.messageController.add(new Uint8List.fromList([0x00, 0x03]));
+
+      // you shouldn't be able to get any bytes from the read again
+      // because only the header was read, and there's no frame body
+      readBytes = expectNoReadableBytes();
+
+      // write first batch of body
+      socket.messageController.add(new Uint8List.fromList(UTF8.encode("H")));
+
+      // you shouldn't be able to get any bytes from the read,
+      // because the frame has been consumed internally
+      expectNoReadableBytes();
+
+      // write second batch of body
+      socket.messageController.add(new Uint8List.fromList(UTF8.encode("i!")));
+
+      // have to wait for the flush to complete,
+      // because it's only then that the frame is available for reading
+      await flushFuture;
+      expect(messageAvailable, true);
+
+      // at this point the frame is complete, so we expect the read to complete
+      readBytes = transport.read(readBuffer, 0, readBuffer.lengthInBytes);
+      expect(readBytes, 3);
+      expect(readBuffer.sublist(0, 3), UTF8.encode("Hi!"));
+    });
+  });
+}
+
+
+
+class FakeReadOnlySocket extends TSocket {
+
+  StreamController<Uint8List> messageController = new StreamController<Uint8List>(sync: true);
+  StreamController<Object> errorController = new StreamController<Object>();
+  StreamController<TSocketState> stateController = new StreamController<TSocketState>();
+
+  @override
+  Future close() {
+    // noop
+  }
+
+  @override
+  bool get isClosed => false;
+
+  @override
+  bool get isOpen => true;
+
+  @override
+  Stream<Object> get onError => errorController.stream;
+
+  @override
+  Stream<Uint8List> get onMessage => messageController.stream;
+
+  @override
+  Stream<TSocketState> get onState => stateController.stream;
+
+  @override
+  Future open() {
+    // noop
+  }
+
+  @override
+  void send(Uint8List data) {
+    // noop
+  }
+}
+
diff --git a/test/known_failures_Linux.json b/test/known_failures_Linux.json
index a8d00ec..a886098 100644
--- a/test/known_failures_Linux.json
+++ b/test/known_failures_Linux.json
@@ -114,11 +114,8 @@
   "d-d_binary_http-ip",
   "d-d_compact_http-ip",
   "d-d_json_http-ip",
-  "d-dart_binary_framed-ip",
   "d-dart_binary_http-ip",
-  "d-dart_compact_framed-ip",
   "d-dart_compact_http-ip",
-  "d-dart_json_framed-ip",
   "d-dart_json_http-ip",
   "d-go_binary_http-ip",
   "d-go_binary_http-ip-ssl",
@@ -227,9 +224,6 @@
   "go-nodejs_json_framed-ip",
   "hs-csharp_binary_framed-ip",
   "hs-csharp_compact_framed-ip",
-  "hs-dart_binary_framed-ip",
-  "hs-dart_compact_framed-ip",
-  "hs-dart_json_framed-ip",
   "java-d_compact_buffered-ip",
   "java-d_compact_buffered-ip-ssl",
   "java-d_compact_framed-ip",
@@ -251,14 +245,8 @@
   "nodejs-d_compact_http-ip-ssl",
   "nodejs-d_json_http-ip",
   "nodejs-d_json_http-ip-ssl",
-  "nodejs-dart_binary_buffered-ip",
-  "nodejs-dart_binary_framed-ip",
   "nodejs-dart_binary_http-ip",
-  "nodejs-dart_compact_buffered-ip",
-  "nodejs-dart_compact_framed-ip",
   "nodejs-dart_compact_http-ip",
-  "nodejs-dart_json_buffered-ip",
-  "nodejs-dart_json_framed-ip",
   "nodejs-dart_json_http-ip",
   "nodejs-go_binary_http-ip",
   "nodejs-go_binary_http-ip-ssl",
@@ -306,9 +294,5 @@
   "rs-cpp_multic-compact_buffered-ip",
   "rs-cpp_multic-compact_framed-ip",
   "rs-cpp_multic_buffered-ip",
-  "rs-cpp_multic_framed-ip",
-  "rs-dart_binary_framed-ip",
-  "rs-dart_compact_framed-ip",
-  "rs-dart_multi-binary_framed-ip",
-  "rs-dart_multic-compact_framed-ip"
+  "rs-cpp_multic_framed-ip"
 ]