THRIFT-4898 Pipe write operations across a network are limited to 65,535 bytes per write.
Client: netstd
Patch: Jens Geyer

This closes #1823
diff --git a/lib/delphi/test/TestClient.pas b/lib/delphi/test/TestClient.pas
index c2660a2..e59c327 100644
--- a/lib/delphi/test/TestClient.pas
+++ b/lib/delphi/test/TestClient.pas
@@ -92,7 +92,8 @@
       Empty,           // Edge case: the zero-length empty binary
       Normal,          // Fairly small array of usual size (256 bytes)
       ByteArrayTest,   // THRIFT-4454 Large writes/reads may cause range check errors in debug mode
-      PipeWriteLimit   // THRIFT-4372 Pipe write operations across a network are limited to 65,535 bytes per write.
+      PipeWriteLimit,  // THRIFT-4372 Pipe write operations across a network are limited to 65,535 bytes per write.
+      TwentyMB         // that's quite a bit of data
     );
 
   private
@@ -537,12 +538,12 @@
   // random binary small
   for testsize := Low(TTestSize) to High(TTestSize) do begin
     binOut := PrepareBinaryData( TRUE, testsize);
-    Console.WriteLine('testBinary('+BytesToHex(binOut)+')');
+    Console.WriteLine('testBinary('+IntToStr(Length(binOut))+' bytes)');
     try
       binIn := client.testBinary(binOut);
-      Expect( Length(binOut) = Length(binIn), 'testBinary(): length '+IntToStr(Length(binOut))+' = '+IntToStr(Length(binIn)));
+      Expect( Length(binOut) = Length(binIn), 'testBinary('+IntToStr(Length(binOut))+' bytes): '+IntToStr(Length(binIn))+' bytes received');
       i32 := Min( Length(binOut), Length(binIn));
-      Expect( CompareMem( binOut, binIn, i32), 'testBinary('+BytesToHex(binOut)+') = '+BytesToHex(binIn));
+      Expect( CompareMem( binOut, binIn, i32), 'testBinary('+IntToStr(Length(binOut))+' bytes): validating received data');
     except
       on e:TApplicationException do Console.WriteLine('testBinary(): '+e.Message);
       on e:Exception do Expect( FALSE, 'testBinary(): Unexpected exception "'+e.ClassName+'": '+e.Message);
@@ -1023,6 +1024,7 @@
     Normal         : SetLength( result, $100);
     ByteArrayTest  : SetLength( result, SizeOf(TByteArray) + 128);
     PipeWriteLimit : SetLength( result, 65535 + 128);
+    TwentyMB       : SetLength( result, 20 * 1024 * 1024);
   else
     raise EArgumentException.Create('aSize');
   end;
diff --git a/lib/delphi/test/TestServer.pas b/lib/delphi/test/TestServer.pas
index 4cb0090..2a80d52 100644
--- a/lib/delphi/test/TestServer.pas
+++ b/lib/delphi/test/TestServer.pas
@@ -144,7 +144,7 @@
 
 function TTestServer.TTestHandlerImpl.testBinary(const thing: TBytes): TBytes;
 begin
-  Console.WriteLine('testBinary("' + BytesToHex( thing ) + '")');
+  Console.WriteLine('testBinary('+IntToStr(Length(thing)) + ' bytes)');
   Result := thing;
 end;
 
diff --git a/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs b/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs
index 2f96a6a..7dfe013 100644
--- a/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs
+++ b/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+using System;
 using System.IO.Pipes;
 using System.Threading;
 using System.Threading.Tasks;
@@ -24,7 +25,7 @@
     // ReSharper disable once InconsistentNaming
     public class TNamedPipeTransport : TTransport
     {
-        private NamedPipeClientStream _client;
+        private NamedPipeClientStream PipeStream;
         private int ConnectTimeout;
 
         public TNamedPipeTransport(string pipe, int timeout = Timeout.Infinite) 
@@ -37,10 +38,10 @@
             var serverName = string.IsNullOrWhiteSpace(server) ? server : ".";
             ConnectTimeout = (timeout > 0) ? timeout : Timeout.Infinite;
 
-            _client = new NamedPipeClientStream(serverName, pipe, PipeDirection.InOut, PipeOptions.None);
+            PipeStream = new NamedPipeClientStream(serverName, pipe, PipeDirection.InOut, PipeOptions.None);
         }
 
-        public override bool IsOpen => _client != null && _client.IsConnected;
+        public override bool IsOpen => PipeStream != null && PipeStream.IsConnected;
 
         public override async Task OpenAsync(CancellationToken cancellationToken)
         {
@@ -49,36 +50,46 @@
                 throw new TTransportException(TTransportException.ExceptionType.AlreadyOpen);
             }
 
-            await _client.ConnectAsync( ConnectTimeout, cancellationToken);
+            await PipeStream.ConnectAsync( ConnectTimeout, cancellationToken);
         }
 
         public override void Close()
         {
-            if (_client != null)
+            if (PipeStream != null)
             {
-                _client.Dispose();
-                _client = null;
+                PipeStream.Dispose();
+                PipeStream = null;
             }
         }
 
         public override async ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
         {
-            if (_client == null)
+            if (PipeStream == null)
             {
                 throw new TTransportException(TTransportException.ExceptionType.NotOpen);
             }
 
-            return await _client.ReadAsync(buffer, offset, length, cancellationToken);
+            return await PipeStream.ReadAsync(buffer, offset, length, cancellationToken);
         }
 
         public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
         {
-            if (_client == null)
+            if (PipeStream == null)
             {
                 throw new TTransportException(TTransportException.ExceptionType.NotOpen);
             }
 
-            await _client.WriteAsync(buffer, offset, length, cancellationToken);
+            // if necessary, send the data in chunks
+            // there's a system limit around 0x10000 bytes that we hit otherwise
+            // MSDN: "Pipe write operations across a network are limited to 65,535 bytes per write. For more information regarding pipes, see the Remarks section."
+            var nBytes = Math.Min(15 * 4096, length); // 16 would exceed the limit
+            while (nBytes > 0)
+            {
+                await PipeStream.WriteAsync(buffer, offset, nBytes, cancellationToken);
+                offset += nBytes;
+                length -= nBytes;
+                nBytes = Math.Min(nBytes, length);
+            }
         }
 
         public override async Task FlushAsync(CancellationToken cancellationToken)
@@ -91,7 +102,7 @@
 
         protected override void Dispose(bool disposing)
         {
-            _client.Dispose();
+            PipeStream.Dispose();
         }
     }
 }
diff --git a/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs b/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs
index 31a052a..77b8251 100644
--- a/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs
+++ b/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs
@@ -239,14 +239,14 @@
 
         private class ServerTransport : TTransport
         {
-            private readonly NamedPipeServerStream _stream;
+            private readonly NamedPipeServerStream PipeStream;
 
             public ServerTransport(NamedPipeServerStream stream)
             {
-                _stream = stream;
+                PipeStream = stream;
             }
 
-            public override bool IsOpen => _stream != null && _stream.IsConnected;
+            public override bool IsOpen => PipeStream != null && PipeStream.IsConnected;
 
             public override async Task OpenAsync(CancellationToken cancellationToken)
             {
@@ -258,27 +258,37 @@
 
             public override void Close()
             {
-                _stream?.Dispose();
+                PipeStream?.Dispose();
             }
 
             public override async ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
             {
-                if (_stream == null)
+                if (PipeStream == null)
                 {
                     throw new TTransportException(TTransportException.ExceptionType.NotOpen);
                 }
 
-                return await _stream.ReadAsync(buffer, offset, length, cancellationToken);
+                return await PipeStream.ReadAsync(buffer, offset, length, cancellationToken);
             }
 
             public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
             {
-                if (_stream == null)
+                if (PipeStream == null)
                 {
                     throw new TTransportException(TTransportException.ExceptionType.NotOpen);
                 }
 
-                await _stream.WriteAsync(buffer, offset, length, cancellationToken);
+                // if necessary, send the data in chunks
+                // there's a system limit around 0x10000 bytes that we hit otherwise
+                // MSDN: "Pipe write operations across a network are limited to 65,535 bytes per write. For more information regarding pipes, see the Remarks section."
+                var nBytes = Math.Min(15 * 4096, length); // 16 would exceed the limit
+                while (nBytes > 0)
+                {
+                    await PipeStream.WriteAsync(buffer, offset, nBytes, cancellationToken);
+                    offset += nBytes;
+                    length -= nBytes;
+                    nBytes = Math.Min(nBytes, length);
+                }
             }
 
             public override async Task FlushAsync(CancellationToken cancellationToken)
@@ -291,7 +301,7 @@
 
             protected override void Dispose(bool disposing)
             {
-                _stream?.Dispose();
+                PipeStream?.Dispose();
             }
         }
     }