THRIFT-4898 Pipe write operations across a network are limited to 65,535 bytes per write.
Client: netstd
Patch: Jens Geyer
This closes #1823
diff --git a/lib/delphi/test/TestClient.pas b/lib/delphi/test/TestClient.pas
index c2660a2..e59c327 100644
--- a/lib/delphi/test/TestClient.pas
+++ b/lib/delphi/test/TestClient.pas
@@ -92,7 +92,8 @@
Empty, // Edge case: the zero-length empty binary
Normal, // Fairly small array of usual size (256 bytes)
ByteArrayTest, // THRIFT-4454 Large writes/reads may cause range check errors in debug mode
- PipeWriteLimit // THRIFT-4372 Pipe write operations across a network are limited to 65,535 bytes per write.
+ PipeWriteLimit, // THRIFT-4372 Pipe write operations across a network are limited to 65,535 bytes per write.
+ TwentyMB // that's quite a bit of data
);
private
@@ -537,12 +538,12 @@
// random binary small
for testsize := Low(TTestSize) to High(TTestSize) do begin
binOut := PrepareBinaryData( TRUE, testsize);
- Console.WriteLine('testBinary('+BytesToHex(binOut)+')');
+ Console.WriteLine('testBinary('+IntToStr(Length(binOut))+' bytes)');
try
binIn := client.testBinary(binOut);
- Expect( Length(binOut) = Length(binIn), 'testBinary(): length '+IntToStr(Length(binOut))+' = '+IntToStr(Length(binIn)));
+ Expect( Length(binOut) = Length(binIn), 'testBinary('+IntToStr(Length(binOut))+' bytes): '+IntToStr(Length(binIn))+' bytes received');
i32 := Min( Length(binOut), Length(binIn));
- Expect( CompareMem( binOut, binIn, i32), 'testBinary('+BytesToHex(binOut)+') = '+BytesToHex(binIn));
+ Expect( CompareMem( binOut, binIn, i32), 'testBinary('+IntToStr(Length(binOut))+' bytes): validating received data');
except
on e:TApplicationException do Console.WriteLine('testBinary(): '+e.Message);
on e:Exception do Expect( FALSE, 'testBinary(): Unexpected exception "'+e.ClassName+'": '+e.Message);
@@ -1023,6 +1024,7 @@
Normal : SetLength( result, $100);
ByteArrayTest : SetLength( result, SizeOf(TByteArray) + 128);
PipeWriteLimit : SetLength( result, 65535 + 128);
+ TwentyMB : SetLength( result, 20 * 1024 * 1024);
else
raise EArgumentException.Create('aSize');
end;
diff --git a/lib/delphi/test/TestServer.pas b/lib/delphi/test/TestServer.pas
index 4cb0090..2a80d52 100644
--- a/lib/delphi/test/TestServer.pas
+++ b/lib/delphi/test/TestServer.pas
@@ -144,7 +144,7 @@
function TTestServer.TTestHandlerImpl.testBinary(const thing: TBytes): TBytes;
begin
- Console.WriteLine('testBinary("' + BytesToHex( thing ) + '")');
+ Console.WriteLine('testBinary('+IntToStr(Length(thing)) + ' bytes)');
Result := thing;
end;
diff --git a/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs b/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs
index 2f96a6a..7dfe013 100644
--- a/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs
+++ b/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+using System;
using System.IO.Pipes;
using System.Threading;
using System.Threading.Tasks;
@@ -24,7 +25,7 @@
// ReSharper disable once InconsistentNaming
public class TNamedPipeTransport : TTransport
{
- private NamedPipeClientStream _client;
+ private NamedPipeClientStream PipeStream;
private int ConnectTimeout;
public TNamedPipeTransport(string pipe, int timeout = Timeout.Infinite)
@@ -37,10 +38,10 @@
var serverName = string.IsNullOrWhiteSpace(server) ? server : ".";
ConnectTimeout = (timeout > 0) ? timeout : Timeout.Infinite;
- _client = new NamedPipeClientStream(serverName, pipe, PipeDirection.InOut, PipeOptions.None);
+ PipeStream = new NamedPipeClientStream(serverName, pipe, PipeDirection.InOut, PipeOptions.None);
}
- public override bool IsOpen => _client != null && _client.IsConnected;
+ public override bool IsOpen => PipeStream != null && PipeStream.IsConnected;
public override async Task OpenAsync(CancellationToken cancellationToken)
{
@@ -49,36 +50,46 @@
throw new TTransportException(TTransportException.ExceptionType.AlreadyOpen);
}
- await _client.ConnectAsync( ConnectTimeout, cancellationToken);
+ await PipeStream.ConnectAsync( ConnectTimeout, cancellationToken);
}
public override void Close()
{
- if (_client != null)
+ if (PipeStream != null)
{
- _client.Dispose();
- _client = null;
+ PipeStream.Dispose();
+ PipeStream = null;
}
}
public override async ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
{
- if (_client == null)
+ if (PipeStream == null)
{
throw new TTransportException(TTransportException.ExceptionType.NotOpen);
}
- return await _client.ReadAsync(buffer, offset, length, cancellationToken);
+ return await PipeStream.ReadAsync(buffer, offset, length, cancellationToken);
}
public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
{
- if (_client == null)
+ if (PipeStream == null)
{
throw new TTransportException(TTransportException.ExceptionType.NotOpen);
}
- await _client.WriteAsync(buffer, offset, length, cancellationToken);
+ // if necessary, send the data in chunks
+ // there's a system limit around 0x10000 bytes that we hit otherwise
+ // MSDN: "Pipe write operations across a network are limited to 65,535 bytes per write. For more information regarding pipes, see the Remarks section."
+ var nBytes = Math.Min(15 * 4096, length); // 16 would exceed the limit
+ while (nBytes > 0)
+ {
+ await PipeStream.WriteAsync(buffer, offset, nBytes, cancellationToken);
+ offset += nBytes;
+ length -= nBytes;
+ nBytes = Math.Min(nBytes, length);
+ }
}
public override async Task FlushAsync(CancellationToken cancellationToken)
@@ -91,7 +102,7 @@
protected override void Dispose(bool disposing)
{
- _client.Dispose();
+ PipeStream.Dispose();
}
}
}
diff --git a/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs b/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs
index 31a052a..77b8251 100644
--- a/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs
+++ b/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs
@@ -239,14 +239,14 @@
private class ServerTransport : TTransport
{
- private readonly NamedPipeServerStream _stream;
+ private readonly NamedPipeServerStream PipeStream;
public ServerTransport(NamedPipeServerStream stream)
{
- _stream = stream;
+ PipeStream = stream;
}
- public override bool IsOpen => _stream != null && _stream.IsConnected;
+ public override bool IsOpen => PipeStream != null && PipeStream.IsConnected;
public override async Task OpenAsync(CancellationToken cancellationToken)
{
@@ -258,27 +258,37 @@
public override void Close()
{
- _stream?.Dispose();
+ PipeStream?.Dispose();
}
public override async ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
{
- if (_stream == null)
+ if (PipeStream == null)
{
throw new TTransportException(TTransportException.ExceptionType.NotOpen);
}
- return await _stream.ReadAsync(buffer, offset, length, cancellationToken);
+ return await PipeStream.ReadAsync(buffer, offset, length, cancellationToken);
}
public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
{
- if (_stream == null)
+ if (PipeStream == null)
{
throw new TTransportException(TTransportException.ExceptionType.NotOpen);
}
- await _stream.WriteAsync(buffer, offset, length, cancellationToken);
+ // if necessary, send the data in chunks
+ // there's a system limit around 0x10000 bytes that we hit otherwise
+ // MSDN: "Pipe write operations across a network are limited to 65,535 bytes per write. For more information regarding pipes, see the Remarks section."
+ var nBytes = Math.Min(15 * 4096, length); // 16 would exceed the limit
+ while (nBytes > 0)
+ {
+ await PipeStream.WriteAsync(buffer, offset, nBytes, cancellationToken);
+ offset += nBytes;
+ length -= nBytes;
+ nBytes = Math.Min(nBytes, length);
+ }
}
public override async Task FlushAsync(CancellationToken cancellationToken)
@@ -291,7 +301,7 @@
protected override void Dispose(bool disposing)
{
- _stream?.Dispose();
+ PipeStream?.Dispose();
}
}
}
diff --git a/test/netstd/Client/TestClient.cs b/test/netstd/Client/TestClient.cs
index 6be1023..0f58f95 100644
--- a/test/netstd/Client/TestClient.cs
+++ b/test/netstd/Client/TestClient.cs
@@ -73,7 +73,7 @@
public ProtocolChoice protocol = ProtocolChoice.Binary;
public TransportChoice transport = TransportChoice.Socket;
- internal void Parse( List<string> args)
+ internal void Parse(List<string> args)
{
for (var i = 0; i < args.Count; ++i)
{
@@ -220,18 +220,18 @@
{
throw new FileNotFoundException($"Cannot find file: {clientCertName}");
}
-
+
var cert = new X509Certificate2(existingPath, "thrift");
return cert;
}
-
+
public TTransport CreateTransport()
{
// endpoint transport
TTransport trans = null;
- switch(transport)
+ switch (transport)
{
case TransportChoice.Http:
Debug.Assert(url != null);
@@ -249,8 +249,8 @@
{
throw new InvalidOperationException("Certificate doesn't contain private key");
}
-
- trans = new TTlsSocketTransport(host, port, 0, cert,
+
+ trans = new TTlsSocketTransport(host, port, 0, cert,
(sender, certificate, chain, errors) => true,
null, SslProtocols.Tls | SslProtocols.Tls11 | SslProtocols.Tls12);
break;
@@ -263,7 +263,7 @@
// layered transport
- switch(layered)
+ switch (layered)
{
case LayeredChoice.Buffered:
trans = new TBufferedTransport(trans);
@@ -436,15 +436,46 @@
return BitConverter.ToString(data).Replace("-", string.Empty);
}
- public static byte[] PrepareTestData(bool randomDist)
+
+ public enum BinaryTestSize
{
- var retval = new byte[0x100];
- var initLen = Math.Min(0x100, retval.Length);
+ Empty, // Edge case: the zero-length empty binary
+ Normal, // Fairly small array of usual size (256 bytes)
+ Large, // Large writes/reads may cause range check errors
+ PipeWriteLimit, // Windows Limit: Pipe write operations across a network are limited to 65,535 bytes per write.
+ TwentyMB // that's quite a bit of data
+ };
+
+ public static byte[] PrepareTestData(bool randomDist, BinaryTestSize testcase)
+ {
+ int amount = -1;
+ switch (testcase)
+ {
+ case BinaryTestSize.Empty:
+ amount = 0;
+ break;
+ case BinaryTestSize.Normal:
+ amount = 0x100;
+ break;
+ case BinaryTestSize.Large:
+ amount = 0x8000 + 128;
+ break;
+ case BinaryTestSize.PipeWriteLimit:
+ amount = 0xFFFF + 128;
+ break;
+ case BinaryTestSize.TwentyMB:
+ amount = 20 * 1024 * 1024;
+ break;
+ default:
+ throw new ArgumentException(nameof(testcase));
+ }
+
+ var retval = new byte[amount];
// linear distribution, unless random is requested
if (!randomDist)
{
- for (var i = 0; i < initLen; ++i)
+ for (var i = 0; i < retval.Length; ++i)
{
retval[i] = (byte)i;
}
@@ -452,22 +483,10 @@
}
// random distribution
- for (var i = 0; i < initLen; ++i)
- {
- retval[i] = (byte)0;
- }
var rnd = new Random();
- for (var i = 1; i < initLen; ++i)
+ for (var i = 1; i < retval.Length; ++i)
{
- while (true)
- {
- var nextPos = rnd.Next() % initLen;
- if (retval[nextPos] == 0)
- {
- retval[nextPos] = (byte)i;
- break;
- }
- }
+ retval[i] = (byte)rnd.Next(0x100);
}
return retval;
}
@@ -557,32 +576,39 @@
returnCode |= ErrorBaseTypes;
}
- var binOut = PrepareTestData(true);
- Console.Write("testBinary(" + BytesToHex(binOut) + ")");
- try
+ // testBinary()
+ foreach(BinaryTestSize binTestCase in Enum.GetValues(typeof(BinaryTestSize)))
{
- var binIn = await client.testBinaryAsync(binOut, MakeTimeoutToken());
- Console.WriteLine(" = " + BytesToHex(binIn));
- if (binIn.Length != binOut.Length)
+ var binOut = PrepareTestData(true, binTestCase);
+
+ Console.Write("testBinary({0} bytes)", binOut.Length);
+ try
{
- Console.WriteLine("*** FAILED ***");
- returnCode |= ErrorBaseTypes;
- }
- for (var ofs = 0; ofs < Math.Min(binIn.Length, binOut.Length); ++ofs)
- if (binIn[ofs] != binOut[ofs])
+ var binIn = await client.testBinaryAsync(binOut, MakeTimeoutToken());
+ Console.WriteLine(" = {0} bytes", binIn.Length);
+ if (binIn.Length != binOut.Length)
{
Console.WriteLine("*** FAILED ***");
returnCode |= ErrorBaseTypes;
}
- }
- catch (Thrift.TApplicationException ex)
- {
- Console.WriteLine("*** FAILED ***");
- returnCode |= ErrorBaseTypes;
- Console.WriteLine(ex.Message + " ST: " + ex.StackTrace);
+ for (var ofs = 0; ofs < Math.Min(binIn.Length, binOut.Length); ++ofs)
+ {
+ if (binIn[ofs] != binOut[ofs])
+ {
+ Console.WriteLine("*** FAILED ***");
+ returnCode |= ErrorBaseTypes;
+ }
+ }
+ }
+ catch (Thrift.TApplicationException ex)
+ {
+ Console.WriteLine("*** FAILED ***");
+ returnCode |= ErrorBaseTypes;
+ Console.WriteLine(ex.Message + " ST: " + ex.StackTrace);
+ }
}
- // binary equals?
+ // CrazyNesting
Console.WriteLine("Test CrazyNesting");
var one = new CrazyNesting();
var two = new CrazyNesting();
diff --git a/test/netstd/Server/TestServer.cs b/test/netstd/Server/TestServer.cs
index 82b36eb..25c2afc 100644
--- a/test/netstd/Server/TestServer.cs
+++ b/test/netstd/Server/TestServer.cs
@@ -246,8 +246,7 @@
public Task<byte[]> testBinaryAsync(byte[] thing, CancellationToken cancellationToken)
{
- var hex = BitConverter.ToString(thing).Replace("-", string.Empty);
- logger.Invoke("testBinary({0:X})", hex);
+ logger.Invoke("testBinary({0} bytes)", thing.Length);
return Task.FromResult(thing);
}