THRIFT-4372 Pipe write operations across a network are limited to 65,535 bytes per write
Client: Delphi, C#
Patch: Jens Geyer
This closes #1402
diff --git a/lib/delphi/src/Thrift.Transport.Pipes.pas b/lib/delphi/src/Thrift.Transport.Pipes.pas
index 9b7f842..aace4bb 100644
--- a/lib/delphi/src/Thrift.Transport.Pipes.pas
+++ b/lib/delphi/src/Thrift.Transport.Pipes.pas
@@ -327,51 +327,70 @@
procedure TPipeStreamBase.WriteDirect( const pBuf : Pointer; offset: Integer; count: Integer);
-var cbWritten : DWORD;
+var cbWritten, nBytes : DWORD;
begin
if not IsOpen
then raise TTransportExceptionNotOpen.Create('Called write on non-open pipe');
- if not WriteFile( FPipe, PByteArray(pBuf)^[offset], count, cbWritten, nil)
- then raise TTransportExceptionNotOpen.Create('Write to pipe failed');
+ // if necessary, send the data in chunks
+ // there's a system limit around 0x10000 bytes that we hit otherwise
+ // MSDN: "Pipe write operations across a network are limited to 65,535 bytes per write. For more information regarding pipes, see the Remarks section."
+ nBytes := Min( 15*4096, count); // 16 would exceed the limit
+ while nBytes > 0 do begin
+ if not WriteFile( FPipe, PByteArray(pBuf)^[offset], nBytes, cbWritten, nil)
+ then raise TTransportExceptionNotOpen.Create('Write to pipe failed');
+
+ Inc( offset, cbWritten);
+ Dec( count, cbWritten);
+ nBytes := Min( nBytes, count);
+ end;
end;
procedure TPipeStreamBase.WriteOverlapped( const pBuf : Pointer; offset: Integer; count: Integer);
-var cbWritten, dwWait, dwError : DWORD;
+var cbWritten, dwWait, dwError, nBytes : DWORD;
overlapped : IOverlappedHelper;
begin
if not IsOpen
then raise TTransportExceptionNotOpen.Create('Called write on non-open pipe');
- overlapped := TOverlappedHelperImpl.Create;
+ // if necessary, send the data in chunks
+ // there's a system limit around 0x10000 bytes that we hit otherwise
+ // MSDN: "Pipe write operations across a network are limited to 65,535 bytes per write. For more information regarding pipes, see the Remarks section."
+ nBytes := Min( 15*4096, count); // 16 would exceed the limit
+ while nBytes > 0 do begin
+ overlapped := TOverlappedHelperImpl.Create;
+ if not WriteFile( FPipe, PByteArray(pBuf)^[offset], nBytes, cbWritten, overlapped.OverlappedPtr)
+ then begin
+ dwError := GetLastError;
+ case dwError of
+ ERROR_IO_PENDING : begin
+ dwWait := overlapped.WaitFor(FTimeout);
- if not WriteFile( FPipe, PByteArray(pBuf)^[offset], count, cbWritten, overlapped.OverlappedPtr)
- then begin
- dwError := GetLastError;
- case dwError of
- ERROR_IO_PENDING : begin
- dwWait := overlapped.WaitFor(FTimeout);
+ if (dwWait = WAIT_TIMEOUT)
+ then raise TTransportExceptionTimedOut.Create('Pipe write timed out');
- if (dwWait = WAIT_TIMEOUT)
- then raise TTransportExceptionTimedOut.Create('Pipe write timed out');
+ if (dwWait <> WAIT_OBJECT_0)
+ or not GetOverlappedResult( FPipe, overlapped.Overlapped, cbWritten, TRUE)
+ then raise TTransportExceptionUnknown.Create('Pipe write error');
+ end;
- if (dwWait <> WAIT_OBJECT_0)
- or not GetOverlappedResult( FPipe, overlapped.Overlapped, cbWritten, TRUE)
- then raise TTransportExceptionUnknown.Create('Pipe write error');
+ else
+ raise TTransportExceptionUnknown.Create(SysErrorMessage(dwError));
end;
-
- else
- raise TTransportExceptionUnknown.Create(SysErrorMessage(dwError));
end;
- end;
- ASSERT( DWORD(count) = cbWritten);
+ ASSERT( DWORD(nBytes) = cbWritten);
+
+ Inc( offset, cbWritten);
+ Dec( count, cbWritten);
+ nBytes := Min( nBytes, count);
+ end;
end;
function TPipeStreamBase.ReadDirect( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
-var cbRead, dwErr : DWORD;
+var cbRead, dwErr, nRemaining : DWORD;
bytes, retries : LongInt;
bOk : Boolean;
const INTERVAL = 10; // ms
@@ -406,48 +425,61 @@
end;
end;
- // read the data (or block INFINITE-ly)
- bOk := ReadFile( FPipe, PByteArray(pBuf)^[offset], count, cbRead, nil);
- if (not bOk) and (GetLastError() <> ERROR_MORE_DATA)
- then result := 0 // No more data, possibly because client disconnected.
- else result := cbRead;
+ result := 0;
+ nRemaining := count;
+ while nRemaining > 0 do begin
+ // read the data (or block INFINITE-ly)
+ bOk := ReadFile( FPipe, PByteArray(pBuf)^[offset], nRemaining, cbRead, nil);
+ if (not bOk) and (GetLastError() <> ERROR_MORE_DATA)
+ then Break; // No more data, possibly because client disconnected.
+
+ Dec( nRemaining, cbRead);
+ Inc( offset, cbRead);
+ Inc( result, cbRead);
+ end;
end;
function TPipeStreamBase.ReadOverlapped( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
-var cbRead, dwWait, dwError : DWORD;
+var cbRead, dwWait, dwError, nRemaining : DWORD;
bOk : Boolean;
overlapped : IOverlappedHelper;
begin
if not IsOpen
then raise TTransportExceptionNotOpen.Create('Called read on non-open pipe');
- overlapped := TOverlappedHelperImpl.Create;
+ result := 0;
+ nRemaining := count;
+ while nRemaining > 0 do begin
+ overlapped := TOverlappedHelperImpl.Create;
- // read the data
- bOk := ReadFile( FPipe, PByteArray(pBuf)^[offset], count, cbRead, overlapped.OverlappedPtr);
- if not bOk then begin
- dwError := GetLastError;
- case dwError of
- ERROR_IO_PENDING : begin
- dwWait := overlapped.WaitFor(FTimeout);
+ // read the data
+ bOk := ReadFile( FPipe, PByteArray(pBuf)^[offset], nRemaining, cbRead, overlapped.OverlappedPtr);
+ if not bOk then begin
+ dwError := GetLastError;
+ case dwError of
+ ERROR_IO_PENDING : begin
+ dwWait := overlapped.WaitFor(FTimeout);
- if (dwWait = WAIT_TIMEOUT)
- then raise TTransportExceptionTimedOut.Create('Pipe read timed out');
+ if (dwWait = WAIT_TIMEOUT)
+ then raise TTransportExceptionTimedOut.Create('Pipe read timed out');
- if (dwWait <> WAIT_OBJECT_0)
- or not GetOverlappedResult( FPipe, overlapped.Overlapped, cbRead, TRUE)
- then raise TTransportExceptionUnknown.Create('Pipe read error');
+ if (dwWait <> WAIT_OBJECT_0)
+ or not GetOverlappedResult( FPipe, overlapped.Overlapped, cbRead, TRUE)
+ then raise TTransportExceptionUnknown.Create('Pipe read error');
+ end;
+
+ else
+ raise TTransportExceptionUnknown.Create(SysErrorMessage(dwError));
end;
-
- else
- raise TTransportExceptionUnknown.Create(SysErrorMessage(dwError));
end;
- end;
- ASSERT( cbRead > 0); // see TTransportImpl.ReadAll()
- ASSERT( cbRead = DWORD(count));
- result := cbRead;
+ ASSERT( cbRead > 0); // see TTransportImpl.ReadAll()
+ ASSERT( cbRead <= DWORD(nRemaining));
+ Dec( nRemaining, cbRead);
+ Inc( offset, cbRead);
+ Inc( result, cbRead);
+ end;
end;
diff --git a/lib/delphi/test/TestClient.pas b/lib/delphi/test/TestClient.pas
index 37d8546..59b2a66 100644
--- a/lib/delphi/test/TestClient.pas
+++ b/lib/delphi/test/TestClient.pas
@@ -86,7 +86,7 @@
procedure ClientTest;
procedure JSONProtocolReadWriteTest;
- function PrepareBinaryData( aRandomDist : Boolean = FALSE) : TBytes;
+ function PrepareBinaryData( aRandomDist, aHuge : Boolean) : TBytes;
{$IFDEF StressTest}
procedure StressTest(const client : TThriftTest.Iface);
{$ENDIF}
@@ -546,8 +546,21 @@
i64 := client.testI64(-34359738368);
Expect( i64 = -34359738368, 'testI64(-34359738368) = ' + IntToStr( i64));
- // random binary
- binOut := PrepareBinaryData( TRUE);
+ // random binary small
+ binOut := PrepareBinaryData( TRUE, FALSE);
+ Console.WriteLine('testBinary('+BytesToHex(binOut)+')');
+ try
+ binIn := client.testBinary(binOut);
+ Expect( Length(binOut) = Length(binIn), 'testBinary(): length '+IntToStr(Length(binOut))+' = '+IntToStr(Length(binIn)));
+ i32 := Min( Length(binOut), Length(binIn));
+ Expect( CompareMem( binOut, binIn, i32), 'testBinary('+BytesToHex(binOut)+') = '+BytesToHex(binIn));
+ except
+ on e:TApplicationException do Console.WriteLine('testBinary(): '+e.Message);
+ on e:Exception do Expect( FALSE, 'testBinary(): Unexpected exception "'+e.ClassName+'": '+e.Message);
+ end;
+
+ // random binary huge
+ binOut := PrepareBinaryData( TRUE, TRUE);
Console.WriteLine('testBinary('+BytesToHex(binOut)+')');
try
binIn := client.testBinary(binOut);
@@ -1011,10 +1024,12 @@
{$ENDIF}
-function TClientThread.PrepareBinaryData( aRandomDist : Boolean = FALSE) : TBytes;
-var i, nextPos : Integer;
+function TClientThread.PrepareBinaryData( aRandomDist, aHuge : Boolean) : TBytes;
+var i : Integer;
begin
- SetLength( result, $100);
+ if aHuge
+ then SetLength( result, $12345) // tests for THRIFT-4372
+ else SetLength( result, $100);
ASSERT( Low(result) = 0);
// linear distribution, unless random is requested
@@ -1027,13 +1042,8 @@
// random distribution of all 256 values
FillChar( result[0], Length(result) * SizeOf(result[0]), $0);
- i := 1;
- while i < Length(result) do begin
- nextPos := Byte( Random($100));
- if result[nextPos] = 0 then begin // unused?
- result[nextPos] := i;
- Inc(i);
- end;
+ for i := Low(result) to High(result) do begin
+ result[i] := Byte( Random($100));
end;
end;
@@ -1080,7 +1090,7 @@
StartTestGroup( 'JsonProtocolTest', test_Unknown);
// prepare binary data
- binary := PrepareBinaryData( FALSE);
+ binary := PrepareBinaryData( FALSE, FALSE);
SetLength( emptyBinary, 0); // empty binary data block
// output setup