THRIFT-5851 Promote known total stream sizes for seekable stream transports properly
Client: Delphi
Patch: Jens Geyer
diff --git a/lib/delphi/src/Thrift.Transport.pas b/lib/delphi/src/Thrift.Transport.pas
index fd08837..4ca3831 100644
--- a/lib/delphi/src/Thrift.Transport.pas
+++ b/lib/delphi/src/Thrift.Transport.pas
@@ -68,7 +68,7 @@
function Configuration : IThriftConfiguration;
function MaxMessageSize : Integer;
- procedure ResetConsumedMessageSize( const knownSize : Int64 = -1);
+ procedure ResetMessageSizeAndConsumedBytes( const knownSize : Int64 = -1);
procedure CheckReadBytesAvailable( const numBytes : Int64);
procedure UpdateKnownMessageSize( const size : Int64);
end;
@@ -106,10 +106,10 @@
function MaxMessageSize : Integer;
property RemainingMessageSize : Int64 read FRemainingMessageSize;
property KnownMessageSize : Int64 read FKnownMessageSize;
- procedure ResetConsumedMessageSize( const newSize : Int64 = -1);
+ procedure ResetMessageSizeAndConsumedBytes( const newSize : Int64 = -1);
procedure UpdateKnownMessageSize(const size : Int64); override;
- procedure CheckReadBytesAvailable(const numBytes : Int64); inline;
- procedure CountConsumedMessageBytes(const numBytes : Int64); inline;
+ procedure CheckReadBytesAvailable(const numBytes : Int64); {$IFNDEF Debug} inline; {$ENDIF}
+ procedure CountConsumedMessageBytes(const numBytes : Int64); {$IFNDEF Debug} inline; {$ENDIF}
public
constructor Create( const aConfig : IThriftConfiguration); reintroduce;
end;
@@ -124,7 +124,7 @@
function Configuration : IThriftConfiguration; override;
procedure UpdateKnownMessageSize( const size : Int64); override;
function MaxMessageSize : Integer; inline;
- procedure ResetConsumedMessageSize( const knownSize : Int64 = -1); inline;
+ procedure ResetMessageSizeAndConsumedBytes( const knownSize : Int64 = -1); inline;
procedure CheckReadBytesAvailable( const numBytes : Int64); virtual;
public
constructor Create( const aTransport: T); reintroduce;
@@ -303,14 +303,18 @@
end;
TStreamTransportImpl = class( TEndpointTransportBase, IStreamTransport)
- strict protected
- FInputStream : IThriftStream;
- FOutputStream : IThriftStream;
+ strict private
+ FInternalInputStream : IThriftStream;
+ FInternalOutputStream : IThriftStream;
+
strict protected
function GetIsOpen: Boolean; override;
- function GetInputStream: IThriftStream;
- function GetOutputStream: IThriftStream;
+ function GetInputStream: IThriftStream; inline;
+ procedure SetInputStream( const stream : IThriftStream);
+
+ function GetOutputStream: IThriftStream; inline;
+ procedure SetOutputStream( const stream : IThriftStream);
strict protected
procedure Open; override;
@@ -318,6 +322,8 @@
procedure Flush; override;
function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
procedure Write( const pBuf : Pointer; off, len : Integer); override;
+
+ procedure UpdateKnownMessageSize(const size : Int64); override;
public
constructor Create( const aInputStream, aOutputStream : IThriftStream; const aConfig : IThriftConfiguration = nil); reintroduce;
destructor Destroy; override;
@@ -340,6 +346,7 @@
procedure Flush; override;
function IsOpen: Boolean; override;
function ToArray: TBytes; override;
+ function CanSeek : Boolean; override;
function Size : Int64; override;
function Position : Int64; override;
public
@@ -545,7 +552,7 @@
then FConfiguration := aConfig
else FConfiguration := TThriftConfigurationImpl.Create;
- ResetConsumedMessageSize;
+ ResetMessageSizeAndConsumedBytes;
end;
@@ -562,7 +569,7 @@
end;
-procedure TEndpointTransportBase.ResetConsumedMessageSize( const newSize : Int64);
+procedure TEndpointTransportBase.ResetMessageSizeAndConsumedBytes( const newSize : Int64);
// Resets RemainingMessageSize to the configured maximum
begin
// full reset
@@ -583,12 +590,12 @@
procedure TEndpointTransportBase.UpdateKnownMessageSize( const size : Int64);
-// Updates RemainingMessageSize to reflect then known real message size (e.g. framed transport).
+// Updates RemainingMessageSize to reflect the known real message size (e.g. framed transport).
// Will throw if we already consumed too many bytes.
var consumed : Int64;
begin
consumed := KnownMessageSize - RemainingMessageSize;
- ResetConsumedMessageSize(size);
+ ResetMessageSizeAndConsumedBytes(size);
CountConsumedMessageBytes(consumed);
end;
@@ -642,9 +649,9 @@
end;
-procedure TLayeredTransportBase<T>.ResetConsumedMessageSize( const knownSize : Int64 = -1);
+procedure TLayeredTransportBase<T>.ResetMessageSizeAndConsumedBytes( const knownSize : Int64 = -1);
begin
- InnerTransport.ResetConsumedMessageSize( knownSize);
+ InnerTransport.ResetMessageSizeAndConsumedBytes( knownSize);
end;
@@ -964,8 +971,8 @@
begin
inherited Close;
- FInputStream := nil;
- FOutputStream := nil;
+ SetInputStream( nil);
+ SetOutputStream( nil);
if FOwnsClient
then FreeAndNil( FClient)
@@ -997,8 +1004,8 @@
FOwnsClient := True;
stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
- FInputStream := stream;
- FOutputStream := stream;
+ SetInputStream( stream);
+ SetOutputStream( stream);
end;
procedure TSocketImpl.Open;
@@ -1026,8 +1033,8 @@
FClient.Open;
{$ENDIF}
- FInputStream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
- FOutputStream := FInputStream;
+ SetInputStream( TTcpSocketStreamImpl.Create( FClient, FTimeout));
+ SetOutputStream( InputStream); // same
end;
{ TBufferedStream }
@@ -1156,6 +1163,12 @@
end;
+function TBufferedStreamImpl.CanSeek : Boolean;
+begin
+ result := TRUE;
+end;
+
+
function TBufferedStreamImpl.Size : Int64;
begin
result := FReadBuffer.Size;
@@ -1173,35 +1186,52 @@
constructor TStreamTransportImpl.Create( const aInputStream, aOutputStream : IThriftStream; const aConfig : IThriftConfiguration);
begin
inherited Create( aConfig);
- FInputStream := aInputStream;
- FOutputStream := aOutputStream;
+ SetInputStream( aInputStream);
+ SetOutputStream( aOutputStream);
end;
destructor TStreamTransportImpl.Destroy;
begin
- FInputStream := nil;
- FOutputStream := nil;
+ SetInputStream( nil);
+ SetInputStream( nil);
inherited;
end;
procedure TStreamTransportImpl.Close;
begin
- FInputStream := nil;
- FOutputStream := nil;
+ SetInputStream( nil);
+ SetInputStream( nil);
end;
procedure TStreamTransportImpl.Flush;
begin
- if FOutputStream = nil then begin
+ if OutputStream = nil then begin
raise TTransportExceptionNotOpen.Create('Cannot flush null outputstream' );
end;
- FOutputStream.Flush;
+ OutputStream.Flush;
end;
function TStreamTransportImpl.GetInputStream: IThriftStream;
begin
- Result := FInputStream;
+ Result := FInternalInputStream;
+end;
+
+procedure TStreamTransportImpl.SetInputStream( const stream : IThriftStream);
+begin
+ FInternalInputStream := stream;
+ ResetMessageSizeAndConsumedBytes(-1); // full reset to configured maximum
+ UpdateKnownMessageSize( -1); // adjust to real stream size
+end;
+
+function TStreamTransportImpl.GetOutputStream: IThriftStream;
+begin
+ Result := FInternalOutputStream;
+end;
+
+procedure TStreamTransportImpl.SetOutputStream( const stream : IThriftStream);
+begin
+ FInternalOutputStream := stream;
end;
function TStreamTransportImpl.GetIsOpen: Boolean;
@@ -1209,11 +1239,6 @@
Result := True;
end;
-function TStreamTransportImpl.GetOutputStream: IThriftStream;
-begin
- Result := FOutputStream;
-end;
-
procedure TStreamTransportImpl.Open;
begin
// nothing to do
@@ -1221,19 +1246,36 @@
function TStreamTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
begin
- if FInputStream = nil
+ if InputStream = nil
then raise TTransportExceptionNotOpen.Create('Cannot read from null inputstream' );
- Result := FInputStream.Read( pBuf,buflen, off, len );
+ Result := InputStream.Read( pBuf,buflen, off, len );
CountConsumedMessageBytes( result);
end;
procedure TStreamTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
begin
- if FOutputStream = nil
+ if OutputStream = nil
then raise TTransportExceptionNotOpen.Create('Cannot write to null outputstream' );
- FOutputStream.Write( pBuf, off, len );
+ OutputStream.Write( pBuf, off, len );
+end;
+
+
+procedure TStreamTransportImpl.UpdateKnownMessageSize(const size : Int64);
+var adjusted : Int64;
+begin
+ if InputStream = nil
+ then adjusted := 0
+ else begin
+ adjusted := MaxMessageSize;
+ if size > 0
+ then adjusted := Math.Min( adjusted, size);
+ if InputStream.CanSeek
+ then adjusted := Math.Min( adjusted, InputStream.Size);
+ end;
+
+ inherited UpdateKnownMessageSize( adjusted);
end;
{ TBufferedTransportImpl }