THRIFT-4372 Pipe write operations across a network are limited to 65,535 bytes per write
Client: Delphi, C#
Patch: Jens Geyer

This closes #1402
diff --git a/lib/delphi/src/Thrift.Transport.Pipes.pas b/lib/delphi/src/Thrift.Transport.Pipes.pas
index 9b7f842..aace4bb 100644
--- a/lib/delphi/src/Thrift.Transport.Pipes.pas
+++ b/lib/delphi/src/Thrift.Transport.Pipes.pas
@@ -327,51 +327,70 @@
 
 
 procedure TPipeStreamBase.WriteDirect( const pBuf : Pointer; offset: Integer; count: Integer);
-var cbWritten : DWORD;
+var cbWritten, nBytes : DWORD;
 begin
   if not IsOpen
   then raise TTransportExceptionNotOpen.Create('Called write on non-open pipe');
 
-  if not WriteFile( FPipe, PByteArray(pBuf)^[offset], count, cbWritten, nil)
-  then raise TTransportExceptionNotOpen.Create('Write to pipe failed');
+  // if necessary, send the data in chunks
+  // there's a system limit around 0x10000 bytes that we hit otherwise
+  // MSDN: "Pipe write operations across a network are limited to 65,535 bytes per write. For more information regarding pipes, see the Remarks section."
+  nBytes := Min( 15*4096, count); // 16 would exceed the limit
+  while nBytes > 0 do begin
+    if not WriteFile( FPipe, PByteArray(pBuf)^[offset], nBytes, cbWritten, nil)
+    then raise TTransportExceptionNotOpen.Create('Write to pipe failed');
+
+    Inc( offset, cbWritten);
+    Dec( count, cbWritten);
+    nBytes := Min( nBytes, count);
+  end;
 end;
 
 
 procedure TPipeStreamBase.WriteOverlapped( const pBuf : Pointer; offset: Integer; count: Integer);
-var cbWritten, dwWait, dwError : DWORD;
+var cbWritten, dwWait, dwError, nBytes : DWORD;
     overlapped : IOverlappedHelper;
 begin
   if not IsOpen
   then raise TTransportExceptionNotOpen.Create('Called write on non-open pipe');
 
-  overlapped := TOverlappedHelperImpl.Create;
+  // if necessary, send the data in chunks
+  // there's a system limit around 0x10000 bytes that we hit otherwise
+  // MSDN: "Pipe write operations across a network are limited to 65,535 bytes per write. For more information regarding pipes, see the Remarks section."
+  nBytes := Min( 15*4096, count); // 16 would exceed the limit
+  while nBytes > 0 do begin
+    overlapped := TOverlappedHelperImpl.Create;
+    if not WriteFile( FPipe, PByteArray(pBuf)^[offset], nBytes, cbWritten, overlapped.OverlappedPtr)
+    then begin
+      dwError := GetLastError;
+      case dwError of
+        ERROR_IO_PENDING : begin
+          dwWait := overlapped.WaitFor(FTimeout);
 
-  if not WriteFile( FPipe, PByteArray(pBuf)^[offset], count, cbWritten, overlapped.OverlappedPtr)
-  then begin
-    dwError := GetLastError;
-    case dwError of
-      ERROR_IO_PENDING : begin
-        dwWait := overlapped.WaitFor(FTimeout);
+          if (dwWait = WAIT_TIMEOUT)
+          then raise TTransportExceptionTimedOut.Create('Pipe write timed out');
 
-        if (dwWait = WAIT_TIMEOUT)
-        then raise TTransportExceptionTimedOut.Create('Pipe write timed out');
+          if (dwWait <> WAIT_OBJECT_0)
+          or not GetOverlappedResult( FPipe, overlapped.Overlapped, cbWritten, TRUE)
+          then raise TTransportExceptionUnknown.Create('Pipe write error');
+        end;
 
-        if (dwWait <> WAIT_OBJECT_0)
-        or not GetOverlappedResult( FPipe, overlapped.Overlapped, cbWritten, TRUE)
-        then raise TTransportExceptionUnknown.Create('Pipe write error');
+      else
+        raise TTransportExceptionUnknown.Create(SysErrorMessage(dwError));
       end;
-
-    else
-      raise TTransportExceptionUnknown.Create(SysErrorMessage(dwError));
     end;
-  end;
 
-  ASSERT( DWORD(count) = cbWritten);
+    ASSERT( DWORD(nBytes) = cbWritten);
+
+    Inc( offset, cbWritten);
+    Dec( count, cbWritten);
+    nBytes := Min( nBytes, count);
+  end;
 end;
 
 
 function TPipeStreamBase.ReadDirect(     const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
-var cbRead, dwErr  : DWORD;
+var cbRead, dwErr, nRemaining  : DWORD;
     bytes, retries  : LongInt;
     bOk     : Boolean;
 const INTERVAL = 10;  // ms
@@ -406,48 +425,61 @@
     end;
   end;
 
-  // read the data (or block INFINITE-ly)
-  bOk := ReadFile( FPipe, PByteArray(pBuf)^[offset], count, cbRead, nil);
-  if (not bOk) and (GetLastError() <> ERROR_MORE_DATA)
-  then result := 0 // No more data, possibly because client disconnected.
-  else result := cbRead;
+  result := 0;
+  nRemaining := count;
+  while nRemaining > 0 do begin
+    // read the data (or block INFINITE-ly)
+    bOk := ReadFile( FPipe, PByteArray(pBuf)^[offset], nRemaining, cbRead, nil);
+    if (not bOk) and (GetLastError() <> ERROR_MORE_DATA)
+    then Break; // No more data, possibly because client disconnected.
+
+    Dec( nRemaining, cbRead);
+    Inc( offset, cbRead);
+    Inc( result, cbRead);
+  end;
 end;
 
 
 function TPipeStreamBase.ReadOverlapped( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
-var cbRead, dwWait, dwError  : DWORD;
+var cbRead, dwWait, dwError, nRemaining : DWORD;
     bOk     : Boolean;
     overlapped : IOverlappedHelper;
 begin
   if not IsOpen
   then raise TTransportExceptionNotOpen.Create('Called read on non-open pipe');
 
-  overlapped := TOverlappedHelperImpl.Create;
+  result := 0;
+  nRemaining := count;
+  while nRemaining > 0 do begin
+    overlapped := TOverlappedHelperImpl.Create;
 
-  // read the data
-  bOk := ReadFile( FPipe, PByteArray(pBuf)^[offset], count, cbRead, overlapped.OverlappedPtr);
-  if not bOk then begin
-    dwError := GetLastError;
-    case dwError of
-      ERROR_IO_PENDING : begin
-        dwWait := overlapped.WaitFor(FTimeout);
+     // read the data
+    bOk := ReadFile( FPipe, PByteArray(pBuf)^[offset], nRemaining, cbRead, overlapped.OverlappedPtr);
+    if not bOk then begin
+      dwError := GetLastError;
+      case dwError of
+        ERROR_IO_PENDING : begin
+          dwWait := overlapped.WaitFor(FTimeout);
 
-        if (dwWait = WAIT_TIMEOUT)
-        then raise TTransportExceptionTimedOut.Create('Pipe read timed out');
+          if (dwWait = WAIT_TIMEOUT)
+          then raise TTransportExceptionTimedOut.Create('Pipe read timed out');
 
-        if (dwWait <> WAIT_OBJECT_0)
-        or not GetOverlappedResult( FPipe, overlapped.Overlapped, cbRead, TRUE)
-        then raise TTransportExceptionUnknown.Create('Pipe read error');
+          if (dwWait <> WAIT_OBJECT_0)
+          or not GetOverlappedResult( FPipe, overlapped.Overlapped, cbRead, TRUE)
+          then raise TTransportExceptionUnknown.Create('Pipe read error');
+        end;
+
+      else
+        raise TTransportExceptionUnknown.Create(SysErrorMessage(dwError));
       end;
-
-    else
-      raise TTransportExceptionUnknown.Create(SysErrorMessage(dwError));
     end;
-  end;
 
-  ASSERT( cbRead > 0);  // see TTransportImpl.ReadAll()
-  ASSERT( cbRead = DWORD(count));
-  result := cbRead;
+    ASSERT( cbRead > 0);  // see TTransportImpl.ReadAll()
+    ASSERT( cbRead <= DWORD(nRemaining));
+    Dec( nRemaining, cbRead);
+    Inc( offset, cbRead);
+    Inc( result, cbRead);
+  end;
 end;
 
 
diff --git a/lib/delphi/test/TestClient.pas b/lib/delphi/test/TestClient.pas
index 37d8546..59b2a66 100644
--- a/lib/delphi/test/TestClient.pas
+++ b/lib/delphi/test/TestClient.pas
@@ -86,7 +86,7 @@
 
     procedure ClientTest;
     procedure JSONProtocolReadWriteTest;
-    function  PrepareBinaryData( aRandomDist : Boolean = FALSE) : TBytes;
+    function  PrepareBinaryData( aRandomDist, aHuge : Boolean) : TBytes;
     {$IFDEF StressTest}
     procedure StressTest(const client : TThriftTest.Iface);
     {$ENDIF}
@@ -546,8 +546,21 @@
   i64 := client.testI64(-34359738368);
   Expect( i64 = -34359738368, 'testI64(-34359738368) = ' + IntToStr( i64));
 
-  // random binary
-  binOut := PrepareBinaryData( TRUE);
+  // random binary small
+  binOut := PrepareBinaryData( TRUE, FALSE);
+  Console.WriteLine('testBinary('+BytesToHex(binOut)+')');
+  try
+    binIn := client.testBinary(binOut);
+    Expect( Length(binOut) = Length(binIn), 'testBinary(): length '+IntToStr(Length(binOut))+' = '+IntToStr(Length(binIn)));
+    i32 := Min( Length(binOut), Length(binIn));
+    Expect( CompareMem( binOut, binIn, i32), 'testBinary('+BytesToHex(binOut)+') = '+BytesToHex(binIn));
+  except
+    on e:TApplicationException do Console.WriteLine('testBinary(): '+e.Message);
+    on e:Exception do Expect( FALSE, 'testBinary(): Unexpected exception "'+e.ClassName+'": '+e.Message);
+  end;
+
+  // random binary huge
+  binOut := PrepareBinaryData( TRUE, TRUE);
   Console.WriteLine('testBinary('+BytesToHex(binOut)+')');
   try
     binIn := client.testBinary(binOut);
@@ -1011,10 +1024,12 @@
 {$ENDIF}
 
 
-function TClientThread.PrepareBinaryData( aRandomDist : Boolean = FALSE) : TBytes;
-var i, nextPos : Integer;
+function TClientThread.PrepareBinaryData( aRandomDist, aHuge : Boolean) : TBytes;
+var i : Integer;
 begin
-  SetLength( result, $100);
+  if aHuge
+  then SetLength( result, $12345)  // tests for THRIFT-4372
+  else SetLength( result, $100);
   ASSERT( Low(result) = 0);
 
   // linear distribution, unless random is requested
@@ -1027,13 +1042,8 @@
 
   // random distribution of all 256 values
   FillChar( result[0], Length(result) * SizeOf(result[0]), $0);
-  i := 1;
-  while i < Length(result) do begin
-    nextPos := Byte( Random($100));
-    if result[nextPos] = 0 then begin  // unused?
-      result[nextPos] := i;
-      Inc(i);
-    end;
+  for i := Low(result) to High(result) do begin
+    result[i] := Byte( Random($100));
   end;
 end;
 
@@ -1080,7 +1090,7 @@
     StartTestGroup( 'JsonProtocolTest', test_Unknown);
 
     // prepare binary data
-    binary := PrepareBinaryData( FALSE);
+    binary := PrepareBinaryData( FALSE, FALSE);
     SetLength( emptyBinary, 0); // empty binary data block
 
     // output setup