THRIFT-4898 Pipe write operations across a network are limited to 65,535 bytes per write.
Client: netstd
Patch: Jens Geyer
This closes #1823
diff --git a/lib/delphi/test/TestClient.pas b/lib/delphi/test/TestClient.pas
index c2660a2..e59c327 100644
--- a/lib/delphi/test/TestClient.pas
+++ b/lib/delphi/test/TestClient.pas
@@ -92,7 +92,8 @@
Empty, // Edge case: the zero-length empty binary
Normal, // Fairly small array of usual size (256 bytes)
ByteArrayTest, // THRIFT-4454 Large writes/reads may cause range check errors in debug mode
- PipeWriteLimit // THRIFT-4372 Pipe write operations across a network are limited to 65,535 bytes per write.
+ PipeWriteLimit, // THRIFT-4372 Pipe write operations across a network are limited to 65,535 bytes per write.
+ TwentyMB // that's quite a bit of data
);
private
@@ -537,12 +538,12 @@
// random binary small
for testsize := Low(TTestSize) to High(TTestSize) do begin
binOut := PrepareBinaryData( TRUE, testsize);
- Console.WriteLine('testBinary('+BytesToHex(binOut)+')');
+ Console.WriteLine('testBinary('+IntToStr(Length(binOut))+' bytes)');
try
binIn := client.testBinary(binOut);
- Expect( Length(binOut) = Length(binIn), 'testBinary(): length '+IntToStr(Length(binOut))+' = '+IntToStr(Length(binIn)));
+ Expect( Length(binOut) = Length(binIn), 'testBinary('+IntToStr(Length(binOut))+' bytes): '+IntToStr(Length(binIn))+' bytes received');
i32 := Min( Length(binOut), Length(binIn));
- Expect( CompareMem( binOut, binIn, i32), 'testBinary('+BytesToHex(binOut)+') = '+BytesToHex(binIn));
+ Expect( CompareMem( binOut, binIn, i32), 'testBinary('+IntToStr(Length(binOut))+' bytes): validating received data');
except
on e:TApplicationException do Console.WriteLine('testBinary(): '+e.Message);
on e:Exception do Expect( FALSE, 'testBinary(): Unexpected exception "'+e.ClassName+'": '+e.Message);
@@ -1023,6 +1024,7 @@
Normal : SetLength( result, $100);
ByteArrayTest : SetLength( result, SizeOf(TByteArray) + 128);
PipeWriteLimit : SetLength( result, 65535 + 128);
+ TwentyMB : SetLength( result, 20 * 1024 * 1024);
else
raise EArgumentException.Create('aSize');
end;
diff --git a/lib/delphi/test/TestServer.pas b/lib/delphi/test/TestServer.pas
index 4cb0090..2a80d52 100644
--- a/lib/delphi/test/TestServer.pas
+++ b/lib/delphi/test/TestServer.pas
@@ -144,7 +144,7 @@
function TTestServer.TTestHandlerImpl.testBinary(const thing: TBytes): TBytes;
begin
- Console.WriteLine('testBinary("' + BytesToHex( thing ) + '")');
+ Console.WriteLine('testBinary('+IntToStr(Length(thing)) + ' bytes)');
Result := thing;
end;
diff --git a/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs b/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs
index 2f96a6a..7dfe013 100644
--- a/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs
+++ b/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+using System;
using System.IO.Pipes;
using System.Threading;
using System.Threading.Tasks;
@@ -24,7 +25,7 @@
// ReSharper disable once InconsistentNaming
public class TNamedPipeTransport : TTransport
{
- private NamedPipeClientStream _client;
+ private NamedPipeClientStream PipeStream;
private int ConnectTimeout;
public TNamedPipeTransport(string pipe, int timeout = Timeout.Infinite)
@@ -37,10 +38,10 @@
var serverName = string.IsNullOrWhiteSpace(server) ? server : ".";
ConnectTimeout = (timeout > 0) ? timeout : Timeout.Infinite;
- _client = new NamedPipeClientStream(serverName, pipe, PipeDirection.InOut, PipeOptions.None);
+ PipeStream = new NamedPipeClientStream(serverName, pipe, PipeDirection.InOut, PipeOptions.None);
}
- public override bool IsOpen => _client != null && _client.IsConnected;
+ public override bool IsOpen => PipeStream != null && PipeStream.IsConnected;
public override async Task OpenAsync(CancellationToken cancellationToken)
{
@@ -49,36 +50,46 @@
throw new TTransportException(TTransportException.ExceptionType.AlreadyOpen);
}
- await _client.ConnectAsync( ConnectTimeout, cancellationToken);
+ await PipeStream.ConnectAsync( ConnectTimeout, cancellationToken);
}
public override void Close()
{
- if (_client != null)
+ if (PipeStream != null)
{
- _client.Dispose();
- _client = null;
+ PipeStream.Dispose();
+ PipeStream = null;
}
}
public override async ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
{
- if (_client == null)
+ if (PipeStream == null)
{
throw new TTransportException(TTransportException.ExceptionType.NotOpen);
}
- return await _client.ReadAsync(buffer, offset, length, cancellationToken);
+ return await PipeStream.ReadAsync(buffer, offset, length, cancellationToken);
}
public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
{
- if (_client == null)
+ if (PipeStream == null)
{
throw new TTransportException(TTransportException.ExceptionType.NotOpen);
}
- await _client.WriteAsync(buffer, offset, length, cancellationToken);
+ // if necessary, send the data in chunks
+ // there's a system limit around 0x10000 bytes that we hit otherwise
+ // MSDN: "Pipe write operations across a network are limited to 65,535 bytes per write. For more information regarding pipes, see the Remarks section."
+ var nBytes = Math.Min(15 * 4096, length); // 16 would exceed the limit
+ while (nBytes > 0)
+ {
+ await PipeStream.WriteAsync(buffer, offset, nBytes, cancellationToken);
+ offset += nBytes;
+ length -= nBytes;
+ nBytes = Math.Min(nBytes, length);
+ }
}
public override async Task FlushAsync(CancellationToken cancellationToken)
@@ -91,7 +102,7 @@
protected override void Dispose(bool disposing)
{
- _client.Dispose();
+ PipeStream.Dispose();
}
}
}
diff --git a/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs b/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs
index 31a052a..77b8251 100644
--- a/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs
+++ b/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs
@@ -239,14 +239,14 @@
private class ServerTransport : TTransport
{
- private readonly NamedPipeServerStream _stream;
+ private readonly NamedPipeServerStream PipeStream;
public ServerTransport(NamedPipeServerStream stream)
{
- _stream = stream;
+ PipeStream = stream;
}
- public override bool IsOpen => _stream != null && _stream.IsConnected;
+ public override bool IsOpen => PipeStream != null && PipeStream.IsConnected;
public override async Task OpenAsync(CancellationToken cancellationToken)
{
@@ -258,27 +258,37 @@
public override void Close()
{
- _stream?.Dispose();
+ PipeStream?.Dispose();
}
public override async ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
{
- if (_stream == null)
+ if (PipeStream == null)
{
throw new TTransportException(TTransportException.ExceptionType.NotOpen);
}
- return await _stream.ReadAsync(buffer, offset, length, cancellationToken);
+ return await PipeStream.ReadAsync(buffer, offset, length, cancellationToken);
}
public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
{
- if (_stream == null)
+ if (PipeStream == null)
{
throw new TTransportException(TTransportException.ExceptionType.NotOpen);
}
- await _stream.WriteAsync(buffer, offset, length, cancellationToken);
+ // if necessary, send the data in chunks
+ // there's a system limit around 0x10000 bytes that we hit otherwise
+ // MSDN: "Pipe write operations across a network are limited to 65,535 bytes per write. For more information regarding pipes, see the Remarks section."
+ var nBytes = Math.Min(15 * 4096, length); // 16 would exceed the limit
+ while (nBytes > 0)
+ {
+ await PipeStream.WriteAsync(buffer, offset, nBytes, cancellationToken);
+ offset += nBytes;
+ length -= nBytes;
+ nBytes = Math.Min(nBytes, length);
+ }
}
public override async Task FlushAsync(CancellationToken cancellationToken)
@@ -291,7 +301,7 @@
protected override void Dispose(bool disposing)
{
- _stream?.Dispose();
+ PipeStream?.Dispose();
}
}
}