THRIFT-5851 Promote known total stream sizes for seekable stream transports properly
Client: Delphi
Patch: Jens Geyer
diff --git a/lib/delphi/src/Thrift.Protocol.pas b/lib/delphi/src/Thrift.Protocol.pas
index c097283..969fbcd 100644
--- a/lib/delphi/src/Thrift.Protocol.pas
+++ b/lib/delphi/src/Thrift.Protocol.pas
@@ -680,7 +680,7 @@
 
 procedure TProtocolImpl.Reset;
 begin
-  FTrans.ResetConsumedMessageSize;
+  FTrans.ResetMessageSizeAndConsumedBytes;
 end;
 
 function TProtocolImpl.ReadString: string;
diff --git a/lib/delphi/src/Thrift.Stream.pas b/lib/delphi/src/Thrift.Stream.pas
index 6c1320d..5682331 100644
--- a/lib/delphi/src/Thrift.Stream.pas
+++ b/lib/delphi/src/Thrift.Stream.pas
@@ -37,7 +37,7 @@
 
 type
   IThriftStream = interface
-    ['{3A61A8A6-3639-4B91-A260-EFCA23944F3A}']
+    ['{67801A9F-3B85-41CF-9025-D18AC6849B58}']
     procedure Write( const buffer: TBytes; offset: Integer; count: Integer);  overload;
     procedure Write( const pBuf : Pointer; offset: Integer; count: Integer);  overload;
     function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer;  overload;
@@ -47,6 +47,7 @@
     procedure Flush;
     function IsOpen: Boolean;
     function ToArray: TBytes;
+    function CanSeek : Boolean;
     function Size : Int64;
     function Position : Int64;
   end;
@@ -66,6 +67,7 @@
     procedure Flush; virtual; abstract;
     function IsOpen: Boolean; virtual; abstract;
     function ToArray: TBytes; virtual; abstract;
+    function CanSeek : Boolean;  virtual;
     function Size : Int64; virtual;
     function Position : Int64;  virtual;
   end;
@@ -83,6 +85,7 @@
     procedure Flush; override;
     function IsOpen: Boolean; override;
     function ToArray: TBytes; override;
+    function CanSeek : Boolean; override;
     function Size : Int64; override;
     function Position : Int64;  override;
   public
@@ -102,6 +105,7 @@
     procedure Flush; override;
     function IsOpen: Boolean; override;
     function ToArray: TBytes; override;
+    function CanSeek : Boolean; override;
     function Size : Int64; override;
     function Position : Int64;  override;
   public
@@ -176,6 +180,12 @@
   end;
 end;
 
+function TThriftStreamAdapterCOM.CanSeek : Boolean;
+var statstg: TStatStg;
+begin
+  result := IsOpen and Succeeded( FStream.Stat( statstg, STATFLAG_NONAME));
+end;
+
 function TThriftStreamAdapterCOM.Size : Int64;
 var statstg: TStatStg;
 begin
@@ -290,6 +300,11 @@
   CheckSizeAndOffset( pBuf, offset+count, offset, count);
 end;
 
+function TThriftStreamImpl.CanSeek : Boolean;
+begin
+  result := FALSE; // TRUE indicates Size and Position are implemented
+end;
+
 function TThriftStreamImpl.Size : Int64;
 begin
   ASSERT(FALSE);
@@ -332,6 +347,15 @@
   // nothing to do
 end;
 
+function TThriftStreamAdapterDelphi.CanSeek : Boolean;
+begin
+  try
+    result := IsOpen and (FStream.Size >= 0);  // throws if not implemented
+  except
+    result := FALSE;  // seek not implemented
+  end;
+end;
+
 function TThriftStreamAdapterDelphi.Size : Int64;
 begin
   result := FStream.Size;
diff --git a/lib/delphi/src/Thrift.Transport.MsxmlHTTP.pas b/lib/delphi/src/Thrift.Transport.MsxmlHTTP.pas
index b0c7acd..2cc3bfe 100644
--- a/lib/delphi/src/Thrift.Transport.MsxmlHTTP.pas
+++ b/lib/delphi/src/Thrift.Transport.MsxmlHTTP.pas
@@ -256,7 +256,7 @@
     xmlhttp.send( IUnknown( TStreamAdapter.Create( ms, soReference )));
     FInputStream := nil;
     FInputStream := TThriftStreamAdapterCOM.Create( IUnknown( xmlhttp.responseStream) as IStream);
-    ResetConsumedMessageSize;
+    ResetMessageSizeAndConsumedBytes;
     UpdateKnownMessageSize( FInputStream.Size);
   finally
     ms.Free;
diff --git a/lib/delphi/src/Thrift.Transport.Pipes.pas b/lib/delphi/src/Thrift.Transport.Pipes.pas
index 10f4dec..d3980d7 100644
--- a/lib/delphi/src/Thrift.Transport.Pipes.pas
+++ b/lib/delphi/src/Thrift.Transport.Pipes.pas
@@ -679,22 +679,22 @@
 
 function TPipeTransportBase.GetIsOpen: Boolean;
 begin
-  result := (FInputStream <> nil)  and (FInputStream.IsOpen)
-        and (FOutputStream <> nil) and (FOutputStream.IsOpen);
+  result := (InputStream <> nil)  and (InputStream.IsOpen)
+        and (OutputStream <> nil) and (OutputStream.IsOpen);
 end;
 
 
 procedure TPipeTransportBase.Open;
 begin
-  FInputStream.Open;
-  FOutputStream.Open;
+  InputStream.Open;
+  OutputStream.Open;
 end;
 
 
 procedure TPipeTransportBase.Close;
 begin
-  FInputStream.Close;
-  FOutputStream.Close;
+  InputStream.Close;
+  OutputStream.Close;
 end;
 
 
@@ -709,8 +709,8 @@
 // Named pipe constructor
 begin
   inherited Create( nil, nil, aConfig);
-  FInputStream  := TNamedPipeStreamImpl.Create( aPipeName, TRUE, aShareMode, aSecurityAttributes, aTimeOut, aOpenTimeOut);
-  FOutputStream := FInputStream;  // true for named pipes
+  SetInputStream( TNamedPipeStreamImpl.Create( aPipeName, TRUE, aShareMode, aSecurityAttributes, aTimeOut, aOpenTimeOut));
+  SetOutputStream( InputStream);  // true for named pipes
 end;
 
 
@@ -721,8 +721,8 @@
 // Named pipe constructor
 begin
   inherited Create( nil, nil, aConfig);
-  FInputStream  := THandlePipeStreamImpl.Create( aPipe, aOwnsHandle, TRUE, aTimeOut);
-  FOutputStream := FInputStream;  // true for named pipes
+  SetInputStream(  THandlePipeStreamImpl.Create( aPipe, aOwnsHandle, TRUE, aTimeOut));
+  SetOutputStream( InputStream);  // true for named pipes
 end;
 
 
@@ -761,8 +761,8 @@
 begin
   inherited Create( nil, nil, aConfig);
   // overlapped is not supported with AnonPipes, see MSDN
-  FInputStream  := THandlePipeStreamImpl.Create( aPipeRead, aOwnsHandles, FALSE, aTimeout);
-  FOutputStream := THandlePipeStreamImpl.Create( aPipeWrite, aOwnsHandles, FALSE, aTimeout);
+  SetInputStream(  THandlePipeStreamImpl.Create( aPipeRead, aOwnsHandles, FALSE, aTimeout));
+  SetOutputStream( THandlePipeStreamImpl.Create( aPipeWrite, aOwnsHandles, FALSE, aTimeout));
 end;
 
 
diff --git a/lib/delphi/src/Thrift.Transport.WinHTTP.pas b/lib/delphi/src/Thrift.Transport.WinHTTP.pas
index 87a2309..9deda4a 100644
--- a/lib/delphi/src/Thrift.Transport.WinHTTP.pas
+++ b/lib/delphi/src/Thrift.Transport.WinHTTP.pas
@@ -334,7 +334,7 @@
   end;
 
   // we're about to receive a new message, so reset everyting
-  ResetConsumedMessageSize(-1);
+  ResetMessageSizeAndConsumedBytes(-1);
   FInputStream := THTTPResponseStream.Create( http);
   if http.QueryTotalResponseSize( dwSize)  // FALSE indicates "no info available"
   then UpdateKnownMessageSize( dwSize);
diff --git a/lib/delphi/src/Thrift.Transport.pas b/lib/delphi/src/Thrift.Transport.pas
index fd08837..4ca3831 100644
--- a/lib/delphi/src/Thrift.Transport.pas
+++ b/lib/delphi/src/Thrift.Transport.pas
@@ -68,7 +68,7 @@
 
     function  Configuration : IThriftConfiguration;
     function  MaxMessageSize : Integer;
-    procedure ResetConsumedMessageSize( const knownSize : Int64 = -1);
+    procedure ResetMessageSizeAndConsumedBytes( const knownSize : Int64 = -1);
     procedure CheckReadBytesAvailable( const numBytes : Int64);
     procedure UpdateKnownMessageSize( const size : Int64);
   end;
@@ -106,10 +106,10 @@
     function  MaxMessageSize : Integer;
     property  RemainingMessageSize : Int64 read FRemainingMessageSize;
     property  KnownMessageSize : Int64 read FKnownMessageSize;
-    procedure ResetConsumedMessageSize( const newSize : Int64 = -1);
+    procedure ResetMessageSizeAndConsumedBytes( const newSize : Int64 = -1);
     procedure UpdateKnownMessageSize(const size : Int64); override;
-    procedure CheckReadBytesAvailable(const numBytes : Int64); inline;
-    procedure CountConsumedMessageBytes(const numBytes : Int64); inline;
+    procedure CheckReadBytesAvailable(const numBytes : Int64); {$IFNDEF Debug} inline; {$ENDIF}
+    procedure CountConsumedMessageBytes(const numBytes : Int64); {$IFNDEF Debug} inline; {$ENDIF}
   public
     constructor Create( const aConfig : IThriftConfiguration);  reintroduce;
   end;
@@ -124,7 +124,7 @@
     function  Configuration : IThriftConfiguration; override;
     procedure UpdateKnownMessageSize( const size : Int64); override;
     function  MaxMessageSize : Integer;  inline;
-    procedure ResetConsumedMessageSize( const knownSize : Int64 = -1);  inline;
+    procedure ResetMessageSizeAndConsumedBytes( const knownSize : Int64 = -1);  inline;
     procedure CheckReadBytesAvailable( const numBytes : Int64);   virtual;
   public
     constructor Create( const aTransport: T); reintroduce;
@@ -303,14 +303,18 @@
   end;
 
   TStreamTransportImpl = class( TEndpointTransportBase, IStreamTransport)
-  strict protected
-    FInputStream : IThriftStream;
-    FOutputStream : IThriftStream;
+  strict private
+    FInternalInputStream : IThriftStream;
+    FInternalOutputStream : IThriftStream;
+
   strict protected
     function GetIsOpen: Boolean; override;
 
-    function GetInputStream: IThriftStream;
-    function GetOutputStream: IThriftStream;
+    function GetInputStream: IThriftStream; inline;
+    procedure SetInputStream( const stream : IThriftStream);
+
+    function GetOutputStream: IThriftStream; inline;
+    procedure SetOutputStream( const stream : IThriftStream);
 
   strict protected
     procedure Open; override;
@@ -318,6 +322,8 @@
     procedure Flush; override;
     function  Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
     procedure Write( const pBuf : Pointer; off, len : Integer); override;
+
+    procedure UpdateKnownMessageSize(const size : Int64); override;
   public
     constructor Create( const aInputStream, aOutputStream : IThriftStream; const aConfig : IThriftConfiguration = nil);  reintroduce;
     destructor Destroy; override;
@@ -340,6 +346,7 @@
     procedure Flush; override;
     function IsOpen: Boolean; override;
     function ToArray: TBytes; override;
+    function CanSeek : Boolean; override;
     function Size : Int64; override;
     function Position : Int64; override;
   public
@@ -545,7 +552,7 @@
   then FConfiguration := aConfig
   else FConfiguration := TThriftConfigurationImpl.Create;
 
-  ResetConsumedMessageSize;
+  ResetMessageSizeAndConsumedBytes;
 end;
 
 
@@ -562,7 +569,7 @@
 end;
 
 
-procedure TEndpointTransportBase.ResetConsumedMessageSize( const newSize : Int64);
+procedure TEndpointTransportBase.ResetMessageSizeAndConsumedBytes( const newSize : Int64);
 // Resets RemainingMessageSize to the configured maximum
 begin
   // full reset
@@ -583,12 +590,12 @@
 
 
 procedure TEndpointTransportBase.UpdateKnownMessageSize( const size : Int64);
-// Updates RemainingMessageSize to reflect then known real message size (e.g. framed transport).
+// Updates RemainingMessageSize to reflect the known real message size (e.g. framed transport).
 // Will throw if we already consumed too many bytes.
 var consumed : Int64;
 begin
   consumed := KnownMessageSize - RemainingMessageSize;
-  ResetConsumedMessageSize(size);
+  ResetMessageSizeAndConsumedBytes(size);
   CountConsumedMessageBytes(consumed);
 end;
 
@@ -642,9 +649,9 @@
 end;
 
 
-procedure TLayeredTransportBase<T>.ResetConsumedMessageSize( const knownSize : Int64 = -1);
+procedure TLayeredTransportBase<T>.ResetMessageSizeAndConsumedBytes( const knownSize : Int64 = -1);
 begin
-  InnerTransport.ResetConsumedMessageSize( knownSize);
+  InnerTransport.ResetMessageSizeAndConsumedBytes( knownSize);
 end;
 
 
@@ -964,8 +971,8 @@
 begin
   inherited Close;
 
-  FInputStream := nil;
-  FOutputStream := nil;
+  SetInputStream( nil);
+  SetOutputStream( nil);
 
   if FOwnsClient
   then FreeAndNil( FClient)
@@ -997,8 +1004,8 @@
   FOwnsClient := True;
 
   stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
-  FInputStream := stream;
-  FOutputStream := stream;
+  SetInputStream( stream);
+  SetOutputStream( stream);
 end;
 
 procedure TSocketImpl.Open;
@@ -1026,8 +1033,8 @@
   FClient.Open;
 {$ENDIF}
 
-  FInputStream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
-  FOutputStream := FInputStream;
+  SetInputStream( TTcpSocketStreamImpl.Create( FClient, FTimeout));
+  SetOutputStream( InputStream);  // same
 end;
 
 { TBufferedStream }
@@ -1156,6 +1163,12 @@
 end;
 
 
+function TBufferedStreamImpl.CanSeek : Boolean;
+begin
+  result := TRUE;
+end;
+
+
 function TBufferedStreamImpl.Size : Int64;
 begin
   result := FReadBuffer.Size;
@@ -1173,35 +1186,52 @@
 constructor TStreamTransportImpl.Create( const aInputStream, aOutputStream : IThriftStream; const aConfig : IThriftConfiguration);
 begin
   inherited Create( aConfig);
-  FInputStream := aInputStream;
-  FOutputStream := aOutputStream;
+  SetInputStream( aInputStream);
+  SetOutputStream( aOutputStream);
 end;
 
 destructor TStreamTransportImpl.Destroy;
 begin
-  FInputStream := nil;
-  FOutputStream := nil;
+  SetInputStream( nil);
+  SetInputStream( nil);
   inherited;
 end;
 
 procedure TStreamTransportImpl.Close;
 begin
-  FInputStream := nil;
-  FOutputStream := nil;
+  SetInputStream( nil);
+  SetInputStream( nil);
 end;
 
 procedure TStreamTransportImpl.Flush;
 begin
-  if FOutputStream = nil then begin
+  if OutputStream = nil then begin
     raise TTransportExceptionNotOpen.Create('Cannot flush null outputstream' );
   end;
 
-  FOutputStream.Flush;
+  OutputStream.Flush;
 end;
 
 function TStreamTransportImpl.GetInputStream: IThriftStream;
 begin
-  Result := FInputStream;
+  Result := FInternalInputStream;
+end;
+
+procedure TStreamTransportImpl.SetInputStream( const stream : IThriftStream);
+begin
+  FInternalInputStream := stream;
+  ResetMessageSizeAndConsumedBytes(-1);  // full reset to configured maximum
+  UpdateKnownMessageSize( -1);           // adjust to real stream size
+end;
+
+function TStreamTransportImpl.GetOutputStream: IThriftStream;
+begin
+  Result := FInternalOutputStream;
+end;
+
+procedure TStreamTransportImpl.SetOutputStream( const stream : IThriftStream);
+begin
+  FInternalOutputStream := stream;
 end;
 
 function TStreamTransportImpl.GetIsOpen: Boolean;
@@ -1209,11 +1239,6 @@
   Result := True;
 end;
 
-function TStreamTransportImpl.GetOutputStream: IThriftStream;
-begin
-  Result := FOutputStream;
-end;
-
 procedure TStreamTransportImpl.Open;
 begin
   // nothing to do
@@ -1221,19 +1246,36 @@
 
 function TStreamTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
 begin
-  if FInputStream = nil
+  if InputStream = nil
   then raise TTransportExceptionNotOpen.Create('Cannot read from null inputstream' );
 
-  Result := FInputStream.Read( pBuf,buflen, off, len );
+  Result := InputStream.Read( pBuf,buflen, off, len );
   CountConsumedMessageBytes( result);
 end;
 
 procedure TStreamTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
 begin
-  if FOutputStream = nil
+  if OutputStream = nil
   then raise TTransportExceptionNotOpen.Create('Cannot write to null outputstream' );
 
-  FOutputStream.Write( pBuf, off, len );
+  OutputStream.Write( pBuf, off, len );
+end;
+
+
+procedure TStreamTransportImpl.UpdateKnownMessageSize(const size : Int64);
+var adjusted : Int64;
+begin
+  if InputStream = nil
+  then adjusted := 0
+  else begin
+    adjusted := MaxMessageSize;
+    if size > 0
+    then adjusted := Math.Min( adjusted, size);
+    if InputStream.CanSeek
+    then adjusted := Math.Min( adjusted, InputStream.Size);
+  end;
+
+  inherited UpdateKnownMessageSize( adjusted);
 end;
 
 { TBufferedTransportImpl }