THRIFT-5852 Promote known total stream sizes for seekable stream transports
Client: netstd
Patch: Jens Geyer
diff --git a/lib/netstd/Thrift/Protocol/TBinaryProtocol.cs b/lib/netstd/Thrift/Protocol/TBinaryProtocol.cs
index 186d220..44fa9f7 100644
--- a/lib/netstd/Thrift/Protocol/TBinaryProtocol.cs
+++ b/lib/netstd/Thrift/Protocol/TBinaryProtocol.cs
@@ -255,7 +255,7 @@
public override Task ReadMessageEndAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
- Transport.ResetConsumedMessageSize();
+ Transport.ResetMessageSizeAndConsumedBytes();
return Task.CompletedTask;
}
diff --git a/lib/netstd/Thrift/Protocol/TCompactProtocol.cs b/lib/netstd/Thrift/Protocol/TCompactProtocol.cs
index 8e5d00d..1fd7e50 100644
--- a/lib/netstd/Thrift/Protocol/TCompactProtocol.cs
+++ b/lib/netstd/Thrift/Protocol/TCompactProtocol.cs
@@ -461,7 +461,7 @@
public override Task ReadMessageEndAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
- Transport.ResetConsumedMessageSize();
+ Transport.ResetMessageSizeAndConsumedBytes();
return Task.CompletedTask;
}
diff --git a/lib/netstd/Thrift/Protocol/TJSONProtocol.cs b/lib/netstd/Thrift/Protocol/TJSONProtocol.cs
index 02cf3ed..ae37a80 100644
--- a/lib/netstd/Thrift/Protocol/TJSONProtocol.cs
+++ b/lib/netstd/Thrift/Protocol/TJSONProtocol.cs
@@ -700,7 +700,7 @@
{
cancellationToken.ThrowIfCancellationRequested();
await ReadJsonArrayEndAsync(cancellationToken);
- Transport.ResetConsumedMessageSize();
+ Transport.ResetMessageSizeAndConsumedBytes();
}
public override async ValueTask<TStruct> ReadStructBeginAsync(CancellationToken cancellationToken)
diff --git a/lib/netstd/Thrift/Protocol/TProtocolDecorator.cs b/lib/netstd/Thrift/Protocol/TProtocolDecorator.cs
index c75cc63..8e7fb94 100644
--- a/lib/netstd/Thrift/Protocol/TProtocolDecorator.cs
+++ b/lib/netstd/Thrift/Protocol/TProtocolDecorator.cs
@@ -158,7 +158,7 @@
{
cancellationToken.ThrowIfCancellationRequested();
await _wrappedProtocol.ReadMessageEndAsync(cancellationToken);
- Transport.ResetConsumedMessageSize();
+ Transport.ResetMessageSizeAndConsumedBytes();
}
public override async ValueTask<TStruct> ReadStructBeginAsync(CancellationToken cancellationToken)
diff --git a/lib/netstd/Thrift/Transport/Client/THttpTransport.cs b/lib/netstd/Thrift/Transport/Client/THttpTransport.cs
index 16e228b..4467681 100644
--- a/lib/netstd/Thrift/Transport/Client/THttpTransport.cs
+++ b/lib/netstd/Thrift/Transport/Client/THttpTransport.cs
@@ -278,7 +278,7 @@
finally
{
_outputStream = new MemoryStream();
- ResetConsumedMessageSize();
+ ResetMessageSizeAndConsumedBytes();
}
}
diff --git a/lib/netstd/Thrift/Transport/Client/TMemoryBufferTransport.cs b/lib/netstd/Thrift/Transport/Client/TMemoryBufferTransport.cs
index 5773d30..4701378 100644
--- a/lib/netstd/Thrift/Transport/Client/TMemoryBufferTransport.cs
+++ b/lib/netstd/Thrift/Transport/Client/TMemoryBufferTransport.cs
@@ -115,7 +115,7 @@
throw new ArgumentException("Cannot seek outside of the valid range",nameof(origin));
Position = newPos;
- ResetConsumedMessageSize();
+ ResetMessageSizeAndConsumedBytes();
CountConsumedMessageBytes(Position);
}
@@ -145,7 +145,7 @@
public override Task FlushAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
- ResetConsumedMessageSize();
+ ResetMessageSizeAndConsumedBytes();
return Task.CompletedTask;
}
diff --git a/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs b/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs
index 8e60f9f..3ed1b6c 100644
--- a/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs
+++ b/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs
@@ -54,7 +54,7 @@
}
await PipeStream.ConnectAsync( ConnectTimeout, cancellationToken);
- ResetConsumedMessageSize();
+ ResetMessageSizeAndConsumedBytes();
}
public override void Close()
@@ -112,7 +112,7 @@
public override async Task FlushAsync(CancellationToken cancellationToken)
{
await PipeStream.FlushAsync(cancellationToken);
- ResetConsumedMessageSize();
+ ResetMessageSizeAndConsumedBytes();
}
diff --git a/lib/netstd/Thrift/Transport/Client/TStreamTransport.cs b/lib/netstd/Thrift/Transport/Client/TStreamTransport.cs
index 7237b8d..a195001 100644
--- a/lib/netstd/Thrift/Transport/Client/TStreamTransport.cs
+++ b/lib/netstd/Thrift/Transport/Client/TStreamTransport.cs
@@ -16,6 +16,7 @@
// under the License.
using System;
+using System.Drawing;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
@@ -46,10 +47,28 @@
get => _InputStream;
set {
_InputStream = value;
- ResetConsumedMessageSize();
+ ResetMessageSizeAndConsumedBytes(-1); // full reset to configured maximum
+ UpdateKnownMessageSize(-1); // adjust to real stream size
}
}
+ public override void UpdateKnownMessageSize(long size)
+ {
+ long adjusted = 0;
+
+ if (InputStream != null)
+ {
+ adjusted = MaxMessageSize;
+ if (size > 0)
+ adjusted = Math.Min(adjusted, size);
+ if( InputStream.CanSeek)
+ adjusted = Math.Min(adjusted, InputStream.Length);
+ }
+
+ base.UpdateKnownMessageSize(adjusted);
+ }
+
+
public override bool IsOpen => true;
public override Task OpenAsync(CancellationToken cancellationToken)
@@ -106,7 +125,7 @@
public override async Task FlushAsync(CancellationToken cancellationToken)
{
await OutputStream.FlushAsync(cancellationToken);
- ResetConsumedMessageSize();
+ ResetMessageSizeAndConsumedBytes();
}
diff --git a/lib/netstd/Thrift/Transport/Layered/TBufferedTransport.cs b/lib/netstd/Thrift/Transport/Layered/TBufferedTransport.cs
index 7474b7f..977dcbf 100644
--- a/lib/netstd/Thrift/Transport/Layered/TBufferedTransport.cs
+++ b/lib/netstd/Thrift/Transport/Layered/TBufferedTransport.cs
@@ -179,10 +179,10 @@
}
}
- public override void ResetConsumedMessageSize(long newSize = -1)
+ public override void ResetMessageSizeAndConsumedBytes(long newSize = -1)
{
- base.ResetConsumedMessageSize(newSize);
- ReadBuffer.ResetConsumedMessageSize(newSize);
+ base.ResetMessageSizeAndConsumedBytes(newSize);
+ ReadBuffer.ResetMessageSizeAndConsumedBytes(newSize);
}
private void CheckNotDisposed()
diff --git a/lib/netstd/Thrift/Transport/Layered/TFramedTransport.cs b/lib/netstd/Thrift/Transport/Layered/TFramedTransport.cs
index 5e67f10..faa3fa6 100644
--- a/lib/netstd/Thrift/Transport/Layered/TFramedTransport.cs
+++ b/lib/netstd/Thrift/Transport/Layered/TFramedTransport.cs
@@ -191,10 +191,10 @@
IsDisposed = true;
}
- public override void ResetConsumedMessageSize(long newSize = -1)
+ public override void ResetMessageSizeAndConsumedBytes(long newSize = -1)
{
- base.ResetConsumedMessageSize(newSize);
- ReadBuffer.ResetConsumedMessageSize(newSize);
+ base.ResetMessageSizeAndConsumedBytes(newSize);
+ ReadBuffer.ResetMessageSizeAndConsumedBytes(newSize);
}
}
}
diff --git a/lib/netstd/Thrift/Transport/Layered/TLayeredTransport.cs b/lib/netstd/Thrift/Transport/Layered/TLayeredTransport.cs
index 3d855a4..f9a4ce4 100644
--- a/lib/netstd/Thrift/Transport/Layered/TLayeredTransport.cs
+++ b/lib/netstd/Thrift/Transport/Layered/TLayeredTransport.cs
@@ -43,9 +43,9 @@
InnerTransport.CheckReadBytesAvailable(numBytes);
}
- public override void ResetConsumedMessageSize(long newSize = -1)
+ public override void ResetMessageSizeAndConsumedBytes(long newSize = -1)
{
- InnerTransport.ResetConsumedMessageSize(newSize);
+ InnerTransport.ResetMessageSizeAndConsumedBytes(newSize);
}
}
}
diff --git a/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs b/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs
index 12fc9d3..1b6a21f 100644
--- a/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs
+++ b/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs
@@ -424,7 +424,7 @@
public override async Task FlushAsync(CancellationToken cancellationToken)
{
await PipeStream.FlushAsync(cancellationToken);
- ResetConsumedMessageSize();
+ ResetMessageSizeAndConsumedBytes();
}
protected override void Dispose(bool disposing)
diff --git a/lib/netstd/Thrift/Transport/TEndpointTransport.cs b/lib/netstd/Thrift/Transport/TEndpointTransport.cs
index 6c78101..27fb48d 100644
--- a/lib/netstd/Thrift/Transport/TEndpointTransport.cs
+++ b/lib/netstd/Thrift/Transport/TEndpointTransport.cs
@@ -37,13 +37,13 @@
_configuration = config ?? new TConfiguration();
Debug.Assert(Configuration != null);
- ResetConsumedMessageSize();
+ ResetMessageSizeAndConsumedBytes();
}
/// <summary>
/// Resets RemainingMessageSize to the configured maximum
/// </summary>
- public override void ResetConsumedMessageSize(long newSize = -1)
+ public override void ResetMessageSizeAndConsumedBytes(long newSize = -1)
{
// full reset
if (newSize < 0)
@@ -70,7 +70,7 @@
public override void UpdateKnownMessageSize(long size)
{
var consumed = KnownMessageSize - RemainingMessageSize;
- ResetConsumedMessageSize(size);
+ ResetMessageSizeAndConsumedBytes(size);
CountConsumedMessageBytes(consumed);
}
@@ -80,7 +80,7 @@
/// <param name="numBytes"></param>
public override void CheckReadBytesAvailable(long numBytes)
{
- if (RemainingMessageSize < numBytes)
+ if ((RemainingMessageSize < numBytes) || (numBytes < 0))
throw new TTransportException(TTransportException.ExceptionType.EndOfFile, "MaxMessageSize reached");
}
diff --git a/lib/netstd/Thrift/Transport/TTransport.cs b/lib/netstd/Thrift/Transport/TTransport.cs
index 4420224..77ef281 100644
--- a/lib/netstd/Thrift/Transport/TTransport.cs
+++ b/lib/netstd/Thrift/Transport/TTransport.cs
@@ -39,7 +39,7 @@
public abstract TConfiguration Configuration { get; }
public abstract void UpdateKnownMessageSize(long size);
public abstract void CheckReadBytesAvailable(long numBytes);
- public abstract void ResetConsumedMessageSize(long newSize = -1);
+ public abstract void ResetMessageSizeAndConsumedBytes(long newSize = -1);
public void Dispose()
{
Dispose(true);