THRIFT-4187 Allow dart framed transport to read incomplete frame
Client: dart
This closes #1269
diff --git a/lib/dart/lib/src/transport/t_framed_transport.dart b/lib/dart/lib/src/transport/t_framed_transport.dart
index 80ccf2c..2ef03f7 100644
--- a/lib/dart/lib/src/transport/t_framed_transport.dart
+++ b/lib/dart/lib/src/transport/t_framed_transport.dart
@@ -25,7 +25,14 @@
final TTransport _transport;
- final Uint8List headerBytes = new Uint8List(headerByteCount);
+ final Uint8List _headerBytes = new Uint8List(headerByteCount);
+ int _receivedHeaderBytes = 0;
+
+ int _bodySize = 0;
+ Uint8List _body = null;
+ int _receivedBodyBytes = 0;
+
+ Completer<Uint8List> _frameCompleter = null;
TFramedTransport(TTransport transport) : _transport = transport {
if (transport == null) {
@@ -51,33 +58,112 @@
if (got > 0) return got;
}
- _readFrame();
+ // IMPORTANT: by the time you've got here,
+ // an entire frame is available for reading
return super.read(buffer, offset, length);
}
void _readFrame() {
- _transport.readAll(headerBytes, 0, headerByteCount);
- int size = headerBytes.buffer.asByteData().getUint32(0);
-
- if (size < 0) {
- throw new TTransportError(
- TTransportErrorType.UNKNOWN, "Read a negative frame size: $size");
+ if (_body == null) {
+ bool gotFullHeader = _readFrameHeader();
+ if (!gotFullHeader) {
+ return;
+ }
}
- Uint8List buffer = new Uint8List(size);
- _transport.readAll(buffer, 0, size);
- _setReadBuffer(buffer);
+ _readFrameBody();
+ }
+
+ bool _readFrameHeader() {
+ var remainingHeaderBytes = headerByteCount - _receivedHeaderBytes;
+
+ int got = _transport.read(_headerBytes, _receivedHeaderBytes, remainingHeaderBytes);
+ if (got < 0) {
+ throw new TTransportError(
+ TTransportErrorType.UNKNOWN, "Socket closed during frame header read");
+ }
+
+ _receivedHeaderBytes += got;
+
+ if (_receivedHeaderBytes == headerByteCount) {
+ int size = _headerBytes.buffer.asByteData().getUint32(0);
+
+ _receivedHeaderBytes = 0;
+
+ if (size < 0) {
+ throw new TTransportError(
+ TTransportErrorType.UNKNOWN, "Read a negative frame size: $size");
+ }
+
+ _bodySize = size;
+ _body = new Uint8List(_bodySize);
+ _receivedBodyBytes = 0;
+
+ return true;
+ } else {
+ _registerForReadableBytes();
+ return false;
+ }
+ }
+
+ void _readFrameBody() {
+ var remainingBodyBytes = _bodySize - _receivedBodyBytes;
+
+ int got = _transport.read(_body, _receivedBodyBytes, remainingBodyBytes);
+ if (got < 0) {
+ throw new TTransportError(
+ TTransportErrorType.UNKNOWN, "Socket closed during frame body read");
+ }
+
+ _receivedBodyBytes += got;
+
+ if (_receivedBodyBytes == _bodySize) {
+ var body = _body;
+
+ _bodySize = 0;
+ _body = null;
+ _receivedBodyBytes = 0;
+
+ _setReadBuffer(body);
+
+ var completer = _frameCompleter;
+ _frameCompleter = null;
+ completer.complete(new Uint8List(0));
+ } else {
+ _registerForReadableBytes();
+ }
}
Future flush() {
- Uint8List buffer = consumeWriteBuffer();
- int length = buffer.length;
+ if (_frameCompleter == null) {
+ Uint8List buffer = consumeWriteBuffer();
+ int length = buffer.length;
- headerBytes.buffer.asByteData().setUint32(0, length);
- _transport.write(headerBytes, 0, headerByteCount);
- _transport.write(buffer, 0, length);
+ _headerBytes.buffer.asByteData().setUint32(0, length);
+ _transport.write(_headerBytes, 0, headerByteCount);
+ _transport.write(buffer, 0, length);
- return _transport.flush();
+ _frameCompleter = new Completer<Uint8List>();
+ _registerForReadableBytes();
+ }
+
+ return _frameCompleter.future;
+ }
+
+ void _registerForReadableBytes() {
+ _transport.flush().then((_) {
+ _readFrame();
+ }).catchError((e) {
+ var completer = _frameCompleter;
+
+ _receivedHeaderBytes = 0;
+ _bodySize = 0;
+ _body = null;
+ _receivedBodyBytes = 0;
+ _frameCompleter = null;
+
+ completer.completeError(e);
+ });
}
}
diff --git a/lib/dart/lib/src/transport/t_socket_transport.dart b/lib/dart/lib/src/transport/t_socket_transport.dart
index 8dcdfde..c41374a 100644
--- a/lib/dart/lib/src/transport/t_socket_transport.dart
+++ b/lib/dart/lib/src/transport/t_socket_transport.dart
@@ -79,7 +79,9 @@
var completer = new Completer<Uint8List>.sync();
_completers.add(completer);
- socket.send(bytes);
+ if (bytes.lengthInBytes > 0) {
+ socket.send(bytes);
+ }
return completer.future;
}
diff --git a/lib/dart/test/transport/t_framed_transport_test.dart b/lib/dart/test/transport/t_framed_transport_test.dart
new file mode 100644
index 0000000..e072e68
--- /dev/null
+++ b/lib/dart/test/transport/t_framed_transport_test.dart
@@ -0,0 +1,175 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+library thrift.test.transport.t_framed_transport_test;
+
+import 'dart:async';
+import 'dart:convert';
+import 'dart:typed_data' show Uint8List;
+
+import 'package:test/test.dart';
+import 'package:thrift/thrift.dart';
+
+void main() {
+ group('TFramedTransport partial reads', () {
+ final flushAwaitDuration = new Duration(seconds: 10);
+
+ FakeReadOnlySocket socket;
+ TSocketTransport socketTransport;
+ TFramedTransport transport;
+ var messageAvailable;
+
+ setUp(() {
+ socket = new FakeReadOnlySocket();
+ socketTransport = new TClientSocketTransport(socket);
+ transport = new TFramedTransport(socketTransport);
+ messageAvailable = false;
+ });
+
+ expectNoReadableBytes() {
+ var readBuffer = new Uint8List(128);
+ var readBytes = transport.read(readBuffer, 0, readBuffer.lengthInBytes);
+ expect(readBytes, 0);
+ expect(messageAvailable, false);
+ }
+
+ test('Test transport reads messages where header and body are sent separately', () async {
+ // buffer into which we'll read
+ var readBuffer = new Uint8List(10);
+ var readBytes;
+
+ // registers for readable bytes
+ var flushFuture = transport.flush().timeout(flushAwaitDuration);
+ flushFuture.then((_) {
+ messageAvailable = true;
+ });
+
+ // write header bytes
+ socket.messageController.add(new Uint8List.fromList([0x00, 0x00, 0x00, 0x06]));
+
+ // you shouldn't be able to get any bytes from the read,
+ // because the header has been consumed internally
+ expectNoReadableBytes();
+
+ // write first batch of body
+ socket.messageController.add(new Uint8List.fromList(UTF8.encode("He")));
+
+ // you shouldn't be able to get any bytes from the read,
+ // because the frame has been consumed internally
+ expectNoReadableBytes();
+
+ // write second batch of body
+ socket.messageController.add(new Uint8List.fromList(UTF8.encode("llo!")));
+
+ // have to wait for the flush to complete,
+ // because it's only then that the frame is available for reading
+ await flushFuture;
+ expect(messageAvailable, true);
+
+ // at this point the frame is complete, so we expect the read to complete
+ readBytes = transport.read(readBuffer, 0, readBuffer.lengthInBytes);
+ expect(readBytes, 6);
+ expect(readBuffer.sublist(0, 6), UTF8.encode("Hello!"));
+ });
+
+ test('Test transport reads messages where header is sent in pieces '
+ 'and body is also sent in pieces', () async {
+ // buffer into which we'll read
+ var readBuffer = new Uint8List(10);
+ var readBytes;
+
+ // registers for readable bytes
+ var flushFuture = transport.flush().timeout(flushAwaitDuration);
+ flushFuture.then((_) {
+ messageAvailable = true;
+ });
+
+ // write first part of header bytes
+ socket.messageController.add(new Uint8List.fromList([0x00, 0x00]));
+
+ // you shouldn't be able to get any bytes from the read
+ expectNoReadableBytes();
+
+ // write second part of header bytes
+ socket.messageController.add(new Uint8List.fromList([0x00, 0x03]));
+
+ // you shouldn't be able to get any bytes from the read again
+ // because only the header was read, and there's no frame body
+ readBytes = expectNoReadableBytes();
+
+ // write first batch of body
+ socket.messageController.add(new Uint8List.fromList(UTF8.encode("H")));
+
+ // you shouldn't be able to get any bytes from the read,
+ // because the frame has been consumed internally
+ expectNoReadableBytes();
+
+ // write second batch of body
+ socket.messageController.add(new Uint8List.fromList(UTF8.encode("i!")));
+
+ // have to wait for the flush to complete,
+ // because it's only then that the frame is available for reading
+ await flushFuture;
+ expect(messageAvailable, true);
+
+ // at this point the frame is complete, so we expect the read to complete
+ readBytes = transport.read(readBuffer, 0, readBuffer.lengthInBytes);
+ expect(readBytes, 3);
+ expect(readBuffer.sublist(0, 3), UTF8.encode("Hi!"));
+ });
+ });
+}
+
+
+
+class FakeReadOnlySocket extends TSocket {
+
+ StreamController<Uint8List> messageController = new StreamController<Uint8List>(sync: true);
+ StreamController<Object> errorController = new StreamController<Object>();
+ StreamController<TSocketState> stateController = new StreamController<TSocketState>();
+
+ @override
+ Future close() {
+ // noop
+ }
+
+ @override
+ bool get isClosed => false;
+
+ @override
+ bool get isOpen => true;
+
+ @override
+ Stream<Object> get onError => errorController.stream;
+
+ @override
+ Stream<Uint8List> get onMessage => messageController.stream;
+
+ @override
+ Stream<TSocketState> get onState => stateController.stream;
+
+ @override
+ Future open() {
+ // noop
+ }
+
+ @override
+ void send(Uint8List data) {
+ // noop
+ }
+}
+
diff --git a/test/known_failures_Linux.json b/test/known_failures_Linux.json
index a8d00ec..a886098 100644
--- a/test/known_failures_Linux.json
+++ b/test/known_failures_Linux.json
@@ -114,11 +114,8 @@
"d-d_binary_http-ip",
"d-d_compact_http-ip",
"d-d_json_http-ip",
- "d-dart_binary_framed-ip",
"d-dart_binary_http-ip",
- "d-dart_compact_framed-ip",
"d-dart_compact_http-ip",
- "d-dart_json_framed-ip",
"d-dart_json_http-ip",
"d-go_binary_http-ip",
"d-go_binary_http-ip-ssl",
@@ -227,9 +224,6 @@
"go-nodejs_json_framed-ip",
"hs-csharp_binary_framed-ip",
"hs-csharp_compact_framed-ip",
- "hs-dart_binary_framed-ip",
- "hs-dart_compact_framed-ip",
- "hs-dart_json_framed-ip",
"java-d_compact_buffered-ip",
"java-d_compact_buffered-ip-ssl",
"java-d_compact_framed-ip",
@@ -251,14 +245,8 @@
"nodejs-d_compact_http-ip-ssl",
"nodejs-d_json_http-ip",
"nodejs-d_json_http-ip-ssl",
- "nodejs-dart_binary_buffered-ip",
- "nodejs-dart_binary_framed-ip",
"nodejs-dart_binary_http-ip",
- "nodejs-dart_compact_buffered-ip",
- "nodejs-dart_compact_framed-ip",
"nodejs-dart_compact_http-ip",
- "nodejs-dart_json_buffered-ip",
- "nodejs-dart_json_framed-ip",
"nodejs-dart_json_http-ip",
"nodejs-go_binary_http-ip",
"nodejs-go_binary_http-ip-ssl",
@@ -306,9 +294,5 @@
"rs-cpp_multic-compact_buffered-ip",
"rs-cpp_multic-compact_framed-ip",
"rs-cpp_multic_buffered-ip",
- "rs-cpp_multic_framed-ip",
- "rs-dart_binary_framed-ip",
- "rs-dart_compact_framed-ip",
- "rs-dart_multi-binary_framed-ip",
- "rs-dart_multic-compact_framed-ip"
+ "rs-cpp_multic_framed-ip"
]