THRIFT-5851 Promote known total stream sizes for seekable stream transports properly
Client: Delphi
Patch: Jens Geyer
diff --git a/lib/delphi/src/Thrift.Transport.pas b/lib/delphi/src/Thrift.Transport.pas
index fd08837..4ca3831 100644
--- a/lib/delphi/src/Thrift.Transport.pas
+++ b/lib/delphi/src/Thrift.Transport.pas
@@ -68,7 +68,7 @@
 
     function  Configuration : IThriftConfiguration;
     function  MaxMessageSize : Integer;
-    procedure ResetConsumedMessageSize( const knownSize : Int64 = -1);
+    procedure ResetMessageSizeAndConsumedBytes( const knownSize : Int64 = -1);
     procedure CheckReadBytesAvailable( const numBytes : Int64);
     procedure UpdateKnownMessageSize( const size : Int64);
   end;
@@ -106,10 +106,10 @@
     function  MaxMessageSize : Integer;
     property  RemainingMessageSize : Int64 read FRemainingMessageSize;
     property  KnownMessageSize : Int64 read FKnownMessageSize;
-    procedure ResetConsumedMessageSize( const newSize : Int64 = -1);
+    procedure ResetMessageSizeAndConsumedBytes( const newSize : Int64 = -1);
     procedure UpdateKnownMessageSize(const size : Int64); override;
-    procedure CheckReadBytesAvailable(const numBytes : Int64); inline;
-    procedure CountConsumedMessageBytes(const numBytes : Int64); inline;
+    procedure CheckReadBytesAvailable(const numBytes : Int64); {$IFNDEF Debug} inline; {$ENDIF}
+    procedure CountConsumedMessageBytes(const numBytes : Int64); {$IFNDEF Debug} inline; {$ENDIF}
   public
     constructor Create( const aConfig : IThriftConfiguration);  reintroduce;
   end;
@@ -124,7 +124,7 @@
     function  Configuration : IThriftConfiguration; override;
     procedure UpdateKnownMessageSize( const size : Int64); override;
     function  MaxMessageSize : Integer;  inline;
-    procedure ResetConsumedMessageSize( const knownSize : Int64 = -1);  inline;
+    procedure ResetMessageSizeAndConsumedBytes( const knownSize : Int64 = -1);  inline;
     procedure CheckReadBytesAvailable( const numBytes : Int64);   virtual;
   public
     constructor Create( const aTransport: T); reintroduce;
@@ -303,14 +303,18 @@
   end;
 
   TStreamTransportImpl = class( TEndpointTransportBase, IStreamTransport)
-  strict protected
-    FInputStream : IThriftStream;
-    FOutputStream : IThriftStream;
+  strict private
+    FInternalInputStream : IThriftStream;
+    FInternalOutputStream : IThriftStream;
+
   strict protected
     function GetIsOpen: Boolean; override;
 
-    function GetInputStream: IThriftStream;
-    function GetOutputStream: IThriftStream;
+    function GetInputStream: IThriftStream; inline;
+    procedure SetInputStream( const stream : IThriftStream);
+
+    function GetOutputStream: IThriftStream; inline;
+    procedure SetOutputStream( const stream : IThriftStream);
 
   strict protected
     procedure Open; override;
@@ -318,6 +322,8 @@
     procedure Flush; override;
     function  Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
     procedure Write( const pBuf : Pointer; off, len : Integer); override;
+
+    procedure UpdateKnownMessageSize(const size : Int64); override;
   public
     constructor Create( const aInputStream, aOutputStream : IThriftStream; const aConfig : IThriftConfiguration = nil);  reintroduce;
     destructor Destroy; override;
@@ -340,6 +346,7 @@
     procedure Flush; override;
     function IsOpen: Boolean; override;
     function ToArray: TBytes; override;
+    function CanSeek : Boolean; override;
     function Size : Int64; override;
     function Position : Int64; override;
   public
@@ -545,7 +552,7 @@
   then FConfiguration := aConfig
   else FConfiguration := TThriftConfigurationImpl.Create;
 
-  ResetConsumedMessageSize;
+  ResetMessageSizeAndConsumedBytes;
 end;
 
 
@@ -562,7 +569,7 @@
 end;
 
 
-procedure TEndpointTransportBase.ResetConsumedMessageSize( const newSize : Int64);
+procedure TEndpointTransportBase.ResetMessageSizeAndConsumedBytes( const newSize : Int64);
 // Resets RemainingMessageSize to the configured maximum
 begin
   // full reset
@@ -583,12 +590,12 @@
 
 
 procedure TEndpointTransportBase.UpdateKnownMessageSize( const size : Int64);
-// Updates RemainingMessageSize to reflect then known real message size (e.g. framed transport).
+// Updates RemainingMessageSize to reflect the known real message size (e.g. framed transport).
 // Will throw if we already consumed too many bytes.
 var consumed : Int64;
 begin
   consumed := KnownMessageSize - RemainingMessageSize;
-  ResetConsumedMessageSize(size);
+  ResetMessageSizeAndConsumedBytes(size);
   CountConsumedMessageBytes(consumed);
 end;
 
@@ -642,9 +649,9 @@
 end;
 
 
-procedure TLayeredTransportBase<T>.ResetConsumedMessageSize( const knownSize : Int64 = -1);
+procedure TLayeredTransportBase<T>.ResetMessageSizeAndConsumedBytes( const knownSize : Int64 = -1);
 begin
-  InnerTransport.ResetConsumedMessageSize( knownSize);
+  InnerTransport.ResetMessageSizeAndConsumedBytes( knownSize);
 end;
 
 
@@ -964,8 +971,8 @@
 begin
   inherited Close;
 
-  FInputStream := nil;
-  FOutputStream := nil;
+  SetInputStream( nil);
+  SetOutputStream( nil);
 
   if FOwnsClient
   then FreeAndNil( FClient)
@@ -997,8 +1004,8 @@
   FOwnsClient := True;
 
   stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
-  FInputStream := stream;
-  FOutputStream := stream;
+  SetInputStream( stream);
+  SetOutputStream( stream);
 end;
 
 procedure TSocketImpl.Open;
@@ -1026,8 +1033,8 @@
   FClient.Open;
 {$ENDIF}
 
-  FInputStream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
-  FOutputStream := FInputStream;
+  SetInputStream( TTcpSocketStreamImpl.Create( FClient, FTimeout));
+  SetOutputStream( InputStream);  // same
 end;
 
 { TBufferedStream }
@@ -1156,6 +1163,12 @@
 end;
 
 
+function TBufferedStreamImpl.CanSeek : Boolean;
+begin
+  result := TRUE;
+end;
+
+
 function TBufferedStreamImpl.Size : Int64;
 begin
   result := FReadBuffer.Size;
@@ -1173,35 +1186,52 @@
 constructor TStreamTransportImpl.Create( const aInputStream, aOutputStream : IThriftStream; const aConfig : IThriftConfiguration);
 begin
   inherited Create( aConfig);
-  FInputStream := aInputStream;
-  FOutputStream := aOutputStream;
+  SetInputStream( aInputStream);
+  SetOutputStream( aOutputStream);
 end;
 
 destructor TStreamTransportImpl.Destroy;
 begin
-  FInputStream := nil;
-  FOutputStream := nil;
+  SetInputStream( nil);
+  SetInputStream( nil);
   inherited;
 end;
 
 procedure TStreamTransportImpl.Close;
 begin
-  FInputStream := nil;
-  FOutputStream := nil;
+  SetInputStream( nil);
+  SetInputStream( nil);
 end;
 
 procedure TStreamTransportImpl.Flush;
 begin
-  if FOutputStream = nil then begin
+  if OutputStream = nil then begin
     raise TTransportExceptionNotOpen.Create('Cannot flush null outputstream' );
   end;
 
-  FOutputStream.Flush;
+  OutputStream.Flush;
 end;
 
 function TStreamTransportImpl.GetInputStream: IThriftStream;
 begin
-  Result := FInputStream;
+  Result := FInternalInputStream;
+end;
+
+procedure TStreamTransportImpl.SetInputStream( const stream : IThriftStream);
+begin
+  FInternalInputStream := stream;
+  ResetMessageSizeAndConsumedBytes(-1);  // full reset to configured maximum
+  UpdateKnownMessageSize( -1);           // adjust to real stream size
+end;
+
+function TStreamTransportImpl.GetOutputStream: IThriftStream;
+begin
+  Result := FInternalOutputStream;
+end;
+
+procedure TStreamTransportImpl.SetOutputStream( const stream : IThriftStream);
+begin
+  FInternalOutputStream := stream;
 end;
 
 function TStreamTransportImpl.GetIsOpen: Boolean;
@@ -1209,11 +1239,6 @@
   Result := True;
 end;
 
-function TStreamTransportImpl.GetOutputStream: IThriftStream;
-begin
-  Result := FOutputStream;
-end;
-
 procedure TStreamTransportImpl.Open;
 begin
   // nothing to do
@@ -1221,19 +1246,36 @@
 
 function TStreamTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
 begin
-  if FInputStream = nil
+  if InputStream = nil
   then raise TTransportExceptionNotOpen.Create('Cannot read from null inputstream' );
 
-  Result := FInputStream.Read( pBuf,buflen, off, len );
+  Result := InputStream.Read( pBuf,buflen, off, len );
   CountConsumedMessageBytes( result);
 end;
 
 procedure TStreamTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
 begin
-  if FOutputStream = nil
+  if OutputStream = nil
   then raise TTransportExceptionNotOpen.Create('Cannot write to null outputstream' );
 
-  FOutputStream.Write( pBuf, off, len );
+  OutputStream.Write( pBuf, off, len );
+end;
+
+
+procedure TStreamTransportImpl.UpdateKnownMessageSize(const size : Int64);
+var adjusted : Int64;
+begin
+  if InputStream = nil
+  then adjusted := 0
+  else begin
+    adjusted := MaxMessageSize;
+    if size > 0
+    then adjusted := Math.Min( adjusted, size);
+    if InputStream.CanSeek
+    then adjusted := Math.Min( adjusted, InputStream.Size);
+  end;
+
+  inherited UpdateKnownMessageSize( adjusted);
 end;
 
 { TBufferedTransportImpl }