THRIFT-4372 Pipe write operations across a network are limited to 65,535 bytes per write
Client: Delphi, C#
Patch: Jens Geyer
This closes #1402
diff --git a/lib/delphi/src/Thrift.Transport.Pipes.pas b/lib/delphi/src/Thrift.Transport.Pipes.pas
index 9b7f842..aace4bb 100644
--- a/lib/delphi/src/Thrift.Transport.Pipes.pas
+++ b/lib/delphi/src/Thrift.Transport.Pipes.pas
@@ -327,51 +327,70 @@
procedure TPipeStreamBase.WriteDirect( const pBuf : Pointer; offset: Integer; count: Integer);
-var cbWritten : DWORD;
+var cbWritten, nBytes : DWORD;
begin
if not IsOpen
then raise TTransportExceptionNotOpen.Create('Called write on non-open pipe');
- if not WriteFile( FPipe, PByteArray(pBuf)^[offset], count, cbWritten, nil)
- then raise TTransportExceptionNotOpen.Create('Write to pipe failed');
+ // if necessary, send the data in chunks
+ // there's a system limit around 0x10000 bytes that we hit otherwise
+ // MSDN: "Pipe write operations across a network are limited to 65,535 bytes per write. For more information regarding pipes, see the Remarks section."
+ nBytes := Min( 15*4096, count); // 16 would exceed the limit
+ while nBytes > 0 do begin
+ if not WriteFile( FPipe, PByteArray(pBuf)^[offset], nBytes, cbWritten, nil)
+ then raise TTransportExceptionNotOpen.Create('Write to pipe failed');
+
+ Inc( offset, cbWritten);
+ Dec( count, cbWritten);
+ nBytes := Min( nBytes, count);
+ end;
end;
procedure TPipeStreamBase.WriteOverlapped( const pBuf : Pointer; offset: Integer; count: Integer);
-var cbWritten, dwWait, dwError : DWORD;
+var cbWritten, dwWait, dwError, nBytes : DWORD;
overlapped : IOverlappedHelper;
begin
if not IsOpen
then raise TTransportExceptionNotOpen.Create('Called write on non-open pipe');
- overlapped := TOverlappedHelperImpl.Create;
+ // if necessary, send the data in chunks
+ // there's a system limit around 0x10000 bytes that we hit otherwise
+ // MSDN: "Pipe write operations across a network are limited to 65,535 bytes per write. For more information regarding pipes, see the Remarks section."
+ nBytes := Min( 15*4096, count); // 16 would exceed the limit
+ while nBytes > 0 do begin
+ overlapped := TOverlappedHelperImpl.Create;
+ if not WriteFile( FPipe, PByteArray(pBuf)^[offset], nBytes, cbWritten, overlapped.OverlappedPtr)
+ then begin
+ dwError := GetLastError;
+ case dwError of
+ ERROR_IO_PENDING : begin
+ dwWait := overlapped.WaitFor(FTimeout);
- if not WriteFile( FPipe, PByteArray(pBuf)^[offset], count, cbWritten, overlapped.OverlappedPtr)
- then begin
- dwError := GetLastError;
- case dwError of
- ERROR_IO_PENDING : begin
- dwWait := overlapped.WaitFor(FTimeout);
+ if (dwWait = WAIT_TIMEOUT)
+ then raise TTransportExceptionTimedOut.Create('Pipe write timed out');
- if (dwWait = WAIT_TIMEOUT)
- then raise TTransportExceptionTimedOut.Create('Pipe write timed out');
+ if (dwWait <> WAIT_OBJECT_0)
+ or not GetOverlappedResult( FPipe, overlapped.Overlapped, cbWritten, TRUE)
+ then raise TTransportExceptionUnknown.Create('Pipe write error');
+ end;
- if (dwWait <> WAIT_OBJECT_0)
- or not GetOverlappedResult( FPipe, overlapped.Overlapped, cbWritten, TRUE)
- then raise TTransportExceptionUnknown.Create('Pipe write error');
+ else
+ raise TTransportExceptionUnknown.Create(SysErrorMessage(dwError));
end;
-
- else
- raise TTransportExceptionUnknown.Create(SysErrorMessage(dwError));
end;
- end;
- ASSERT( DWORD(count) = cbWritten);
+ ASSERT( DWORD(nBytes) = cbWritten);
+
+ Inc( offset, cbWritten);
+ Dec( count, cbWritten);
+ nBytes := Min( nBytes, count);
+ end;
end;
function TPipeStreamBase.ReadDirect( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
-var cbRead, dwErr : DWORD;
+var cbRead, dwErr, nRemaining : DWORD;
bytes, retries : LongInt;
bOk : Boolean;
const INTERVAL = 10; // ms
@@ -406,48 +425,61 @@
end;
end;
- // read the data (or block INFINITE-ly)
- bOk := ReadFile( FPipe, PByteArray(pBuf)^[offset], count, cbRead, nil);
- if (not bOk) and (GetLastError() <> ERROR_MORE_DATA)
- then result := 0 // No more data, possibly because client disconnected.
- else result := cbRead;
+ result := 0;
+ nRemaining := count;
+ while nRemaining > 0 do begin
+ // read the data (or block INFINITE-ly)
+ bOk := ReadFile( FPipe, PByteArray(pBuf)^[offset], nRemaining, cbRead, nil);
+ if (not bOk) and (GetLastError() <> ERROR_MORE_DATA)
+ then Break; // No more data, possibly because client disconnected.
+
+ Dec( nRemaining, cbRead);
+ Inc( offset, cbRead);
+ Inc( result, cbRead);
+ end;
end;
function TPipeStreamBase.ReadOverlapped( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
-var cbRead, dwWait, dwError : DWORD;
+var cbRead, dwWait, dwError, nRemaining : DWORD;
bOk : Boolean;
overlapped : IOverlappedHelper;
begin
if not IsOpen
then raise TTransportExceptionNotOpen.Create('Called read on non-open pipe');
- overlapped := TOverlappedHelperImpl.Create;
+ result := 0;
+ nRemaining := count;
+ while nRemaining > 0 do begin
+ overlapped := TOverlappedHelperImpl.Create;
- // read the data
- bOk := ReadFile( FPipe, PByteArray(pBuf)^[offset], count, cbRead, overlapped.OverlappedPtr);
- if not bOk then begin
- dwError := GetLastError;
- case dwError of
- ERROR_IO_PENDING : begin
- dwWait := overlapped.WaitFor(FTimeout);
+ // read the data
+ bOk := ReadFile( FPipe, PByteArray(pBuf)^[offset], nRemaining, cbRead, overlapped.OverlappedPtr);
+ if not bOk then begin
+ dwError := GetLastError;
+ case dwError of
+ ERROR_IO_PENDING : begin
+ dwWait := overlapped.WaitFor(FTimeout);
- if (dwWait = WAIT_TIMEOUT)
- then raise TTransportExceptionTimedOut.Create('Pipe read timed out');
+ if (dwWait = WAIT_TIMEOUT)
+ then raise TTransportExceptionTimedOut.Create('Pipe read timed out');
- if (dwWait <> WAIT_OBJECT_0)
- or not GetOverlappedResult( FPipe, overlapped.Overlapped, cbRead, TRUE)
- then raise TTransportExceptionUnknown.Create('Pipe read error');
+ if (dwWait <> WAIT_OBJECT_0)
+ or not GetOverlappedResult( FPipe, overlapped.Overlapped, cbRead, TRUE)
+ then raise TTransportExceptionUnknown.Create('Pipe read error');
+ end;
+
+ else
+ raise TTransportExceptionUnknown.Create(SysErrorMessage(dwError));
end;
-
- else
- raise TTransportExceptionUnknown.Create(SysErrorMessage(dwError));
end;
- end;
- ASSERT( cbRead > 0); // see TTransportImpl.ReadAll()
- ASSERT( cbRead = DWORD(count));
- result := cbRead;
+ ASSERT( cbRead > 0); // see TTransportImpl.ReadAll()
+ ASSERT( cbRead <= DWORD(nRemaining));
+ Dec( nRemaining, cbRead);
+ Inc( offset, cbRead);
+ Inc( result, cbRead);
+ end;
end;