THRIFT-4372 Pipe write operations across a network are limited to 65,535 bytes per write
Client: Delphi, C#
Patch: Jens Geyer

This closes #1402
diff --git a/lib/delphi/src/Thrift.Transport.Pipes.pas b/lib/delphi/src/Thrift.Transport.Pipes.pas
index 9b7f842..aace4bb 100644
--- a/lib/delphi/src/Thrift.Transport.Pipes.pas
+++ b/lib/delphi/src/Thrift.Transport.Pipes.pas
@@ -327,51 +327,70 @@
 
 
 procedure TPipeStreamBase.WriteDirect( const pBuf : Pointer; offset: Integer; count: Integer);
-var cbWritten : DWORD;
+var cbWritten, nBytes : DWORD;
 begin
   if not IsOpen
   then raise TTransportExceptionNotOpen.Create('Called write on non-open pipe');
 
-  if not WriteFile( FPipe, PByteArray(pBuf)^[offset], count, cbWritten, nil)
-  then raise TTransportExceptionNotOpen.Create('Write to pipe failed');
+  // if necessary, send the data in chunks
+  // there's a system limit around 0x10000 bytes that we hit otherwise
+  // MSDN: "Pipe write operations across a network are limited to 65,535 bytes per write. For more information regarding pipes, see the Remarks section."
+  nBytes := Min( 15*4096, count); // 16 would exceed the limit
+  while nBytes > 0 do begin
+    if not WriteFile( FPipe, PByteArray(pBuf)^[offset], nBytes, cbWritten, nil)
+    then raise TTransportExceptionNotOpen.Create('Write to pipe failed');
+
+    Inc( offset, cbWritten);
+    Dec( count, cbWritten);
+    nBytes := Min( nBytes, count);
+  end;
 end;
 
 
 procedure TPipeStreamBase.WriteOverlapped( const pBuf : Pointer; offset: Integer; count: Integer);
-var cbWritten, dwWait, dwError : DWORD;
+var cbWritten, dwWait, dwError, nBytes : DWORD;
     overlapped : IOverlappedHelper;
 begin
   if not IsOpen
   then raise TTransportExceptionNotOpen.Create('Called write on non-open pipe');
 
-  overlapped := TOverlappedHelperImpl.Create;
+  // if necessary, send the data in chunks
+  // there's a system limit around 0x10000 bytes that we hit otherwise
+  // MSDN: "Pipe write operations across a network are limited to 65,535 bytes per write. For more information regarding pipes, see the Remarks section."
+  nBytes := Min( 15*4096, count); // 16 would exceed the limit
+  while nBytes > 0 do begin
+    overlapped := TOverlappedHelperImpl.Create;
+    if not WriteFile( FPipe, PByteArray(pBuf)^[offset], nBytes, cbWritten, overlapped.OverlappedPtr)
+    then begin
+      dwError := GetLastError;
+      case dwError of
+        ERROR_IO_PENDING : begin
+          dwWait := overlapped.WaitFor(FTimeout);
 
-  if not WriteFile( FPipe, PByteArray(pBuf)^[offset], count, cbWritten, overlapped.OverlappedPtr)
-  then begin
-    dwError := GetLastError;
-    case dwError of
-      ERROR_IO_PENDING : begin
-        dwWait := overlapped.WaitFor(FTimeout);
+          if (dwWait = WAIT_TIMEOUT)
+          then raise TTransportExceptionTimedOut.Create('Pipe write timed out');
 
-        if (dwWait = WAIT_TIMEOUT)
-        then raise TTransportExceptionTimedOut.Create('Pipe write timed out');
+          if (dwWait <> WAIT_OBJECT_0)
+          or not GetOverlappedResult( FPipe, overlapped.Overlapped, cbWritten, TRUE)
+          then raise TTransportExceptionUnknown.Create('Pipe write error');
+        end;
 
-        if (dwWait <> WAIT_OBJECT_0)
-        or not GetOverlappedResult( FPipe, overlapped.Overlapped, cbWritten, TRUE)
-        then raise TTransportExceptionUnknown.Create('Pipe write error');
+      else
+        raise TTransportExceptionUnknown.Create(SysErrorMessage(dwError));
       end;
-
-    else
-      raise TTransportExceptionUnknown.Create(SysErrorMessage(dwError));
     end;
-  end;
 
-  ASSERT( DWORD(count) = cbWritten);
+    ASSERT( DWORD(nBytes) = cbWritten);
+
+    Inc( offset, cbWritten);
+    Dec( count, cbWritten);
+    nBytes := Min( nBytes, count);
+  end;
 end;
 
 
 function TPipeStreamBase.ReadDirect(     const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
-var cbRead, dwErr  : DWORD;
+var cbRead, dwErr, nRemaining  : DWORD;
     bytes, retries  : LongInt;
     bOk     : Boolean;
 const INTERVAL = 10;  // ms
@@ -406,48 +425,61 @@
     end;
   end;
 
-  // read the data (or block INFINITE-ly)
-  bOk := ReadFile( FPipe, PByteArray(pBuf)^[offset], count, cbRead, nil);
-  if (not bOk) and (GetLastError() <> ERROR_MORE_DATA)
-  then result := 0 // No more data, possibly because client disconnected.
-  else result := cbRead;
+  result := 0;
+  nRemaining := count;
+  while nRemaining > 0 do begin
+    // read the data (or block INFINITE-ly)
+    bOk := ReadFile( FPipe, PByteArray(pBuf)^[offset], nRemaining, cbRead, nil);
+    if (not bOk) and (GetLastError() <> ERROR_MORE_DATA)
+    then Break; // No more data, possibly because client disconnected.
+
+    Dec( nRemaining, cbRead);
+    Inc( offset, cbRead);
+    Inc( result, cbRead);
+  end;
 end;
 
 
 function TPipeStreamBase.ReadOverlapped( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
-var cbRead, dwWait, dwError  : DWORD;
+var cbRead, dwWait, dwError, nRemaining : DWORD;
     bOk     : Boolean;
     overlapped : IOverlappedHelper;
 begin
   if not IsOpen
   then raise TTransportExceptionNotOpen.Create('Called read on non-open pipe');
 
-  overlapped := TOverlappedHelperImpl.Create;
+  result := 0;
+  nRemaining := count;
+  while nRemaining > 0 do begin
+    overlapped := TOverlappedHelperImpl.Create;
 
-  // read the data
-  bOk := ReadFile( FPipe, PByteArray(pBuf)^[offset], count, cbRead, overlapped.OverlappedPtr);
-  if not bOk then begin
-    dwError := GetLastError;
-    case dwError of
-      ERROR_IO_PENDING : begin
-        dwWait := overlapped.WaitFor(FTimeout);
+     // read the data
+    bOk := ReadFile( FPipe, PByteArray(pBuf)^[offset], nRemaining, cbRead, overlapped.OverlappedPtr);
+    if not bOk then begin
+      dwError := GetLastError;
+      case dwError of
+        ERROR_IO_PENDING : begin
+          dwWait := overlapped.WaitFor(FTimeout);
 
-        if (dwWait = WAIT_TIMEOUT)
-        then raise TTransportExceptionTimedOut.Create('Pipe read timed out');
+          if (dwWait = WAIT_TIMEOUT)
+          then raise TTransportExceptionTimedOut.Create('Pipe read timed out');
 
-        if (dwWait <> WAIT_OBJECT_0)
-        or not GetOverlappedResult( FPipe, overlapped.Overlapped, cbRead, TRUE)
-        then raise TTransportExceptionUnknown.Create('Pipe read error');
+          if (dwWait <> WAIT_OBJECT_0)
+          or not GetOverlappedResult( FPipe, overlapped.Overlapped, cbRead, TRUE)
+          then raise TTransportExceptionUnknown.Create('Pipe read error');
+        end;
+
+      else
+        raise TTransportExceptionUnknown.Create(SysErrorMessage(dwError));
       end;
-
-    else
-      raise TTransportExceptionUnknown.Create(SysErrorMessage(dwError));
     end;
-  end;
 
-  ASSERT( cbRead > 0);  // see TTransportImpl.ReadAll()
-  ASSERT( cbRead = DWORD(count));
-  result := cbRead;
+    ASSERT( cbRead > 0);  // see TTransportImpl.ReadAll()
+    ASSERT( cbRead <= DWORD(nRemaining));
+    Dec( nRemaining, cbRead);
+    Inc( offset, cbRead);
+    Inc( result, cbRead);
+  end;
 end;