THRIFT-5852 Promote known total stream sizes for seekable stream transports
Client: netstd
Patch: Jens Geyer
diff --git a/lib/netstd/Thrift/Protocol/TBinaryProtocol.cs b/lib/netstd/Thrift/Protocol/TBinaryProtocol.cs
index 186d220..44fa9f7 100644
--- a/lib/netstd/Thrift/Protocol/TBinaryProtocol.cs
+++ b/lib/netstd/Thrift/Protocol/TBinaryProtocol.cs
@@ -255,7 +255,7 @@
         public override Task ReadMessageEndAsync(CancellationToken cancellationToken)
         {
             cancellationToken.ThrowIfCancellationRequested();
-            Transport.ResetConsumedMessageSize();
+            Transport.ResetMessageSizeAndConsumedBytes();
             return Task.CompletedTask;
         }
 
diff --git a/lib/netstd/Thrift/Protocol/TCompactProtocol.cs b/lib/netstd/Thrift/Protocol/TCompactProtocol.cs
index 8e5d00d..1fd7e50 100644
--- a/lib/netstd/Thrift/Protocol/TCompactProtocol.cs
+++ b/lib/netstd/Thrift/Protocol/TCompactProtocol.cs
@@ -461,7 +461,7 @@
         public override Task ReadMessageEndAsync(CancellationToken cancellationToken)
         {
             cancellationToken.ThrowIfCancellationRequested();
-            Transport.ResetConsumedMessageSize();
+            Transport.ResetMessageSizeAndConsumedBytes();
             return Task.CompletedTask;
         }
 
diff --git a/lib/netstd/Thrift/Protocol/TJSONProtocol.cs b/lib/netstd/Thrift/Protocol/TJSONProtocol.cs
index 02cf3ed..ae37a80 100644
--- a/lib/netstd/Thrift/Protocol/TJSONProtocol.cs
+++ b/lib/netstd/Thrift/Protocol/TJSONProtocol.cs
@@ -700,7 +700,7 @@
         {
             cancellationToken.ThrowIfCancellationRequested();
             await ReadJsonArrayEndAsync(cancellationToken);
-            Transport.ResetConsumedMessageSize();
+            Transport.ResetMessageSizeAndConsumedBytes();
         }
 
         public override async ValueTask<TStruct> ReadStructBeginAsync(CancellationToken cancellationToken)
diff --git a/lib/netstd/Thrift/Protocol/TProtocolDecorator.cs b/lib/netstd/Thrift/Protocol/TProtocolDecorator.cs
index c75cc63..8e7fb94 100644
--- a/lib/netstd/Thrift/Protocol/TProtocolDecorator.cs
+++ b/lib/netstd/Thrift/Protocol/TProtocolDecorator.cs
@@ -158,7 +158,7 @@
         {
             cancellationToken.ThrowIfCancellationRequested();
             await _wrappedProtocol.ReadMessageEndAsync(cancellationToken);
-            Transport.ResetConsumedMessageSize();
+            Transport.ResetMessageSizeAndConsumedBytes();
         }
 
         public override async ValueTask<TStruct> ReadStructBeginAsync(CancellationToken cancellationToken)
diff --git a/lib/netstd/Thrift/Transport/Client/THttpTransport.cs b/lib/netstd/Thrift/Transport/Client/THttpTransport.cs
index 16e228b..4467681 100644
--- a/lib/netstd/Thrift/Transport/Client/THttpTransport.cs
+++ b/lib/netstd/Thrift/Transport/Client/THttpTransport.cs
@@ -278,7 +278,7 @@
             finally
             {
                 _outputStream = new MemoryStream();
-                ResetConsumedMessageSize();
+                ResetMessageSizeAndConsumedBytes();
             }
         }
 
diff --git a/lib/netstd/Thrift/Transport/Client/TMemoryBufferTransport.cs b/lib/netstd/Thrift/Transport/Client/TMemoryBufferTransport.cs
index 5773d30..4701378 100644
--- a/lib/netstd/Thrift/Transport/Client/TMemoryBufferTransport.cs
+++ b/lib/netstd/Thrift/Transport/Client/TMemoryBufferTransport.cs
@@ -115,7 +115,7 @@
                 throw new ArgumentException("Cannot seek outside of the valid range",nameof(origin));
             Position = newPos;
 
-            ResetConsumedMessageSize();
+            ResetMessageSizeAndConsumedBytes();
             CountConsumedMessageBytes(Position);
         }
 
@@ -145,7 +145,7 @@
         public override Task FlushAsync(CancellationToken cancellationToken)
         {
             cancellationToken.ThrowIfCancellationRequested();
-            ResetConsumedMessageSize();
+            ResetMessageSizeAndConsumedBytes();
             return Task.CompletedTask;
         }
 
diff --git a/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs b/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs
index 8e60f9f..3ed1b6c 100644
--- a/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs
+++ b/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs
@@ -54,7 +54,7 @@
             }
 
             await PipeStream.ConnectAsync( ConnectTimeout, cancellationToken);
-            ResetConsumedMessageSize();
+            ResetMessageSizeAndConsumedBytes();
         }
 
         public override void Close()
@@ -112,7 +112,7 @@
         public override async Task FlushAsync(CancellationToken cancellationToken)
         {
             await PipeStream.FlushAsync(cancellationToken);
-            ResetConsumedMessageSize();
+            ResetMessageSizeAndConsumedBytes();
         }
 
         
diff --git a/lib/netstd/Thrift/Transport/Client/TStreamTransport.cs b/lib/netstd/Thrift/Transport/Client/TStreamTransport.cs
index 7237b8d..a195001 100644
--- a/lib/netstd/Thrift/Transport/Client/TStreamTransport.cs
+++ b/lib/netstd/Thrift/Transport/Client/TStreamTransport.cs
@@ -16,6 +16,7 @@
 // under the License.
 
 using System;
+using System.Drawing;
 using System.IO;
 using System.Threading;
 using System.Threading.Tasks;
@@ -46,10 +47,28 @@
             get => _InputStream;
             set {
                 _InputStream = value;
-                ResetConsumedMessageSize();
+                ResetMessageSizeAndConsumedBytes(-1);  // full reset to configured maximum
+                UpdateKnownMessageSize(-1);            // adjust to real stream size
             }
         }
 
+        public override void UpdateKnownMessageSize(long size)
+        {
+            long adjusted = 0;
+
+            if (InputStream != null)
+            {
+                adjusted = MaxMessageSize;
+                if (size > 0)
+                    adjusted = Math.Min(adjusted, size);
+                if( InputStream.CanSeek)
+                    adjusted = Math.Min(adjusted, InputStream.Length);
+            }
+
+            base.UpdateKnownMessageSize(adjusted);
+        }
+
+
         public override bool IsOpen => true;
 
         public override Task OpenAsync(CancellationToken cancellationToken)
@@ -106,7 +125,7 @@
         public override async Task FlushAsync(CancellationToken cancellationToken)
         {
             await OutputStream.FlushAsync(cancellationToken);
-            ResetConsumedMessageSize();
+            ResetMessageSizeAndConsumedBytes();
         }
 
 
diff --git a/lib/netstd/Thrift/Transport/Layered/TBufferedTransport.cs b/lib/netstd/Thrift/Transport/Layered/TBufferedTransport.cs
index 7474b7f..977dcbf 100644
--- a/lib/netstd/Thrift/Transport/Layered/TBufferedTransport.cs
+++ b/lib/netstd/Thrift/Transport/Layered/TBufferedTransport.cs
@@ -179,10 +179,10 @@
             }
         }
 
-        public override void ResetConsumedMessageSize(long newSize = -1)
+        public override void ResetMessageSizeAndConsumedBytes(long newSize = -1)
         {
-            base.ResetConsumedMessageSize(newSize);
-            ReadBuffer.ResetConsumedMessageSize(newSize);
+            base.ResetMessageSizeAndConsumedBytes(newSize);
+            ReadBuffer.ResetMessageSizeAndConsumedBytes(newSize);
         }
 
         private void CheckNotDisposed()
diff --git a/lib/netstd/Thrift/Transport/Layered/TFramedTransport.cs b/lib/netstd/Thrift/Transport/Layered/TFramedTransport.cs
index 5e67f10..faa3fa6 100644
--- a/lib/netstd/Thrift/Transport/Layered/TFramedTransport.cs
+++ b/lib/netstd/Thrift/Transport/Layered/TFramedTransport.cs
@@ -191,10 +191,10 @@
             IsDisposed = true;
         }
 
-        public override void ResetConsumedMessageSize(long newSize = -1)
+        public override void ResetMessageSizeAndConsumedBytes(long newSize = -1)
         {
-            base.ResetConsumedMessageSize(newSize);
-            ReadBuffer.ResetConsumedMessageSize(newSize);
+            base.ResetMessageSizeAndConsumedBytes(newSize);
+            ReadBuffer.ResetMessageSizeAndConsumedBytes(newSize);
         }
     }
 }
diff --git a/lib/netstd/Thrift/Transport/Layered/TLayeredTransport.cs b/lib/netstd/Thrift/Transport/Layered/TLayeredTransport.cs
index 3d855a4..f9a4ce4 100644
--- a/lib/netstd/Thrift/Transport/Layered/TLayeredTransport.cs
+++ b/lib/netstd/Thrift/Transport/Layered/TLayeredTransport.cs
@@ -43,9 +43,9 @@
             InnerTransport.CheckReadBytesAvailable(numBytes);
         }
 
-        public override void ResetConsumedMessageSize(long newSize = -1)
+        public override void ResetMessageSizeAndConsumedBytes(long newSize = -1)
         {
-            InnerTransport.ResetConsumedMessageSize(newSize);
+            InnerTransport.ResetMessageSizeAndConsumedBytes(newSize);
         }
     }
 }
diff --git a/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs b/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs
index 12fc9d3..1b6a21f 100644
--- a/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs
+++ b/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs
@@ -424,7 +424,7 @@
             public override async Task FlushAsync(CancellationToken cancellationToken)
             {
                 await PipeStream.FlushAsync(cancellationToken);
-                ResetConsumedMessageSize();
+                ResetMessageSizeAndConsumedBytes();
             }
 
             protected override void Dispose(bool disposing)
diff --git a/lib/netstd/Thrift/Transport/TEndpointTransport.cs b/lib/netstd/Thrift/Transport/TEndpointTransport.cs
index 6c78101..27fb48d 100644
--- a/lib/netstd/Thrift/Transport/TEndpointTransport.cs
+++ b/lib/netstd/Thrift/Transport/TEndpointTransport.cs
@@ -37,13 +37,13 @@
             _configuration = config ?? new TConfiguration();
             Debug.Assert(Configuration != null);
 
-            ResetConsumedMessageSize();
+            ResetMessageSizeAndConsumedBytes();
         }
 
         /// <summary>
         /// Resets RemainingMessageSize to the configured maximum 
         /// </summary>
-        public override void ResetConsumedMessageSize(long newSize = -1)
+        public override void ResetMessageSizeAndConsumedBytes(long newSize = -1)
         {
             // full reset 
             if (newSize < 0)
@@ -70,7 +70,7 @@
         public override void UpdateKnownMessageSize(long size)
         {
             var consumed = KnownMessageSize - RemainingMessageSize;
-            ResetConsumedMessageSize(size);
+            ResetMessageSizeAndConsumedBytes(size);
             CountConsumedMessageBytes(consumed);
         }
 
@@ -80,7 +80,7 @@
         /// <param name="numBytes"></param>
         public override void CheckReadBytesAvailable(long numBytes)
         {
-            if (RemainingMessageSize < numBytes)
+            if ((RemainingMessageSize < numBytes) || (numBytes < 0))
                 throw new TTransportException(TTransportException.ExceptionType.EndOfFile, "MaxMessageSize reached");
         }
 
diff --git a/lib/netstd/Thrift/Transport/TTransport.cs b/lib/netstd/Thrift/Transport/TTransport.cs
index 4420224..77ef281 100644
--- a/lib/netstd/Thrift/Transport/TTransport.cs
+++ b/lib/netstd/Thrift/Transport/TTransport.cs
@@ -39,7 +39,7 @@
         public abstract TConfiguration Configuration { get; }
         public abstract void UpdateKnownMessageSize(long size);
         public abstract void CheckReadBytesAvailable(long numBytes);
-        public abstract void ResetConsumedMessageSize(long newSize = -1);
+        public abstract void ResetMessageSizeAndConsumedBytes(long newSize = -1);
         public void Dispose()
         {
             Dispose(true);