THRIFT-5851 Promote known total stream sizes for seekable stream transports properly
Client: Delphi
Patch: Jens Geyer
diff --git a/lib/delphi/src/Thrift.Protocol.pas b/lib/delphi/src/Thrift.Protocol.pas
index c097283..969fbcd 100644
--- a/lib/delphi/src/Thrift.Protocol.pas
+++ b/lib/delphi/src/Thrift.Protocol.pas
@@ -680,7 +680,7 @@
procedure TProtocolImpl.Reset;
begin
- FTrans.ResetConsumedMessageSize;
+ FTrans.ResetMessageSizeAndConsumedBytes;
end;
function TProtocolImpl.ReadString: string;
diff --git a/lib/delphi/src/Thrift.Stream.pas b/lib/delphi/src/Thrift.Stream.pas
index 6c1320d..5682331 100644
--- a/lib/delphi/src/Thrift.Stream.pas
+++ b/lib/delphi/src/Thrift.Stream.pas
@@ -37,7 +37,7 @@
type
IThriftStream = interface
- ['{3A61A8A6-3639-4B91-A260-EFCA23944F3A}']
+ ['{67801A9F-3B85-41CF-9025-D18AC6849B58}']
procedure Write( const buffer: TBytes; offset: Integer; count: Integer); overload;
procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); overload;
function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer; overload;
@@ -47,6 +47,7 @@
procedure Flush;
function IsOpen: Boolean;
function ToArray: TBytes;
+ function CanSeek : Boolean;
function Size : Int64;
function Position : Int64;
end;
@@ -66,6 +67,7 @@
procedure Flush; virtual; abstract;
function IsOpen: Boolean; virtual; abstract;
function ToArray: TBytes; virtual; abstract;
+ function CanSeek : Boolean; virtual;
function Size : Int64; virtual;
function Position : Int64; virtual;
end;
@@ -83,6 +85,7 @@
procedure Flush; override;
function IsOpen: Boolean; override;
function ToArray: TBytes; override;
+ function CanSeek : Boolean; override;
function Size : Int64; override;
function Position : Int64; override;
public
@@ -102,6 +105,7 @@
procedure Flush; override;
function IsOpen: Boolean; override;
function ToArray: TBytes; override;
+ function CanSeek : Boolean; override;
function Size : Int64; override;
function Position : Int64; override;
public
@@ -176,6 +180,12 @@
end;
end;
+function TThriftStreamAdapterCOM.CanSeek : Boolean;
+var statstg: TStatStg;
+begin
+ result := IsOpen and Succeeded( FStream.Stat( statstg, STATFLAG_NONAME));
+end;
+
function TThriftStreamAdapterCOM.Size : Int64;
var statstg: TStatStg;
begin
@@ -290,6 +300,11 @@
CheckSizeAndOffset( pBuf, offset+count, offset, count);
end;
+function TThriftStreamImpl.CanSeek : Boolean;
+begin
+ result := FALSE; // TRUE indicates Size and Position are implemented
+end;
+
function TThriftStreamImpl.Size : Int64;
begin
ASSERT(FALSE);
@@ -332,6 +347,15 @@
// nothing to do
end;
+function TThriftStreamAdapterDelphi.CanSeek : Boolean;
+begin
+ try
+ result := IsOpen and (FStream.Size >= 0); // throws if not implemented
+ except
+ result := FALSE; // seek not implemented
+ end;
+end;
+
function TThriftStreamAdapterDelphi.Size : Int64;
begin
result := FStream.Size;
diff --git a/lib/delphi/src/Thrift.Transport.MsxmlHTTP.pas b/lib/delphi/src/Thrift.Transport.MsxmlHTTP.pas
index b0c7acd..2cc3bfe 100644
--- a/lib/delphi/src/Thrift.Transport.MsxmlHTTP.pas
+++ b/lib/delphi/src/Thrift.Transport.MsxmlHTTP.pas
@@ -256,7 +256,7 @@
xmlhttp.send( IUnknown( TStreamAdapter.Create( ms, soReference )));
FInputStream := nil;
FInputStream := TThriftStreamAdapterCOM.Create( IUnknown( xmlhttp.responseStream) as IStream);
- ResetConsumedMessageSize;
+ ResetMessageSizeAndConsumedBytes;
UpdateKnownMessageSize( FInputStream.Size);
finally
ms.Free;
diff --git a/lib/delphi/src/Thrift.Transport.Pipes.pas b/lib/delphi/src/Thrift.Transport.Pipes.pas
index 10f4dec..d3980d7 100644
--- a/lib/delphi/src/Thrift.Transport.Pipes.pas
+++ b/lib/delphi/src/Thrift.Transport.Pipes.pas
@@ -679,22 +679,22 @@
function TPipeTransportBase.GetIsOpen: Boolean;
begin
- result := (FInputStream <> nil) and (FInputStream.IsOpen)
- and (FOutputStream <> nil) and (FOutputStream.IsOpen);
+ result := (InputStream <> nil) and (InputStream.IsOpen)
+ and (OutputStream <> nil) and (OutputStream.IsOpen);
end;
procedure TPipeTransportBase.Open;
begin
- FInputStream.Open;
- FOutputStream.Open;
+ InputStream.Open;
+ OutputStream.Open;
end;
procedure TPipeTransportBase.Close;
begin
- FInputStream.Close;
- FOutputStream.Close;
+ InputStream.Close;
+ OutputStream.Close;
end;
@@ -709,8 +709,8 @@
// Named pipe constructor
begin
inherited Create( nil, nil, aConfig);
- FInputStream := TNamedPipeStreamImpl.Create( aPipeName, TRUE, aShareMode, aSecurityAttributes, aTimeOut, aOpenTimeOut);
- FOutputStream := FInputStream; // true for named pipes
+ SetInputStream( TNamedPipeStreamImpl.Create( aPipeName, TRUE, aShareMode, aSecurityAttributes, aTimeOut, aOpenTimeOut));
+ SetOutputStream( InputStream); // true for named pipes
end;
@@ -721,8 +721,8 @@
// Named pipe constructor
begin
inherited Create( nil, nil, aConfig);
- FInputStream := THandlePipeStreamImpl.Create( aPipe, aOwnsHandle, TRUE, aTimeOut);
- FOutputStream := FInputStream; // true for named pipes
+ SetInputStream( THandlePipeStreamImpl.Create( aPipe, aOwnsHandle, TRUE, aTimeOut));
+ SetOutputStream( InputStream); // true for named pipes
end;
@@ -761,8 +761,8 @@
begin
inherited Create( nil, nil, aConfig);
// overlapped is not supported with AnonPipes, see MSDN
- FInputStream := THandlePipeStreamImpl.Create( aPipeRead, aOwnsHandles, FALSE, aTimeout);
- FOutputStream := THandlePipeStreamImpl.Create( aPipeWrite, aOwnsHandles, FALSE, aTimeout);
+ SetInputStream( THandlePipeStreamImpl.Create( aPipeRead, aOwnsHandles, FALSE, aTimeout));
+ SetOutputStream( THandlePipeStreamImpl.Create( aPipeWrite, aOwnsHandles, FALSE, aTimeout));
end;
diff --git a/lib/delphi/src/Thrift.Transport.WinHTTP.pas b/lib/delphi/src/Thrift.Transport.WinHTTP.pas
index 87a2309..9deda4a 100644
--- a/lib/delphi/src/Thrift.Transport.WinHTTP.pas
+++ b/lib/delphi/src/Thrift.Transport.WinHTTP.pas
@@ -334,7 +334,7 @@
end;
// we're about to receive a new message, so reset everyting
- ResetConsumedMessageSize(-1);
+ ResetMessageSizeAndConsumedBytes(-1);
FInputStream := THTTPResponseStream.Create( http);
if http.QueryTotalResponseSize( dwSize) // FALSE indicates "no info available"
then UpdateKnownMessageSize( dwSize);
diff --git a/lib/delphi/src/Thrift.Transport.pas b/lib/delphi/src/Thrift.Transport.pas
index fd08837..4ca3831 100644
--- a/lib/delphi/src/Thrift.Transport.pas
+++ b/lib/delphi/src/Thrift.Transport.pas
@@ -68,7 +68,7 @@
function Configuration : IThriftConfiguration;
function MaxMessageSize : Integer;
- procedure ResetConsumedMessageSize( const knownSize : Int64 = -1);
+ procedure ResetMessageSizeAndConsumedBytes( const knownSize : Int64 = -1);
procedure CheckReadBytesAvailable( const numBytes : Int64);
procedure UpdateKnownMessageSize( const size : Int64);
end;
@@ -106,10 +106,10 @@
function MaxMessageSize : Integer;
property RemainingMessageSize : Int64 read FRemainingMessageSize;
property KnownMessageSize : Int64 read FKnownMessageSize;
- procedure ResetConsumedMessageSize( const newSize : Int64 = -1);
+ procedure ResetMessageSizeAndConsumedBytes( const newSize : Int64 = -1);
procedure UpdateKnownMessageSize(const size : Int64); override;
- procedure CheckReadBytesAvailable(const numBytes : Int64); inline;
- procedure CountConsumedMessageBytes(const numBytes : Int64); inline;
+ procedure CheckReadBytesAvailable(const numBytes : Int64); {$IFNDEF Debug} inline; {$ENDIF}
+ procedure CountConsumedMessageBytes(const numBytes : Int64); {$IFNDEF Debug} inline; {$ENDIF}
public
constructor Create( const aConfig : IThriftConfiguration); reintroduce;
end;
@@ -124,7 +124,7 @@
function Configuration : IThriftConfiguration; override;
procedure UpdateKnownMessageSize( const size : Int64); override;
function MaxMessageSize : Integer; inline;
- procedure ResetConsumedMessageSize( const knownSize : Int64 = -1); inline;
+ procedure ResetMessageSizeAndConsumedBytes( const knownSize : Int64 = -1); inline;
procedure CheckReadBytesAvailable( const numBytes : Int64); virtual;
public
constructor Create( const aTransport: T); reintroduce;
@@ -303,14 +303,18 @@
end;
TStreamTransportImpl = class( TEndpointTransportBase, IStreamTransport)
- strict protected
- FInputStream : IThriftStream;
- FOutputStream : IThriftStream;
+ strict private
+ FInternalInputStream : IThriftStream;
+ FInternalOutputStream : IThriftStream;
+
strict protected
function GetIsOpen: Boolean; override;
- function GetInputStream: IThriftStream;
- function GetOutputStream: IThriftStream;
+ function GetInputStream: IThriftStream; inline;
+ procedure SetInputStream( const stream : IThriftStream);
+
+ function GetOutputStream: IThriftStream; inline;
+ procedure SetOutputStream( const stream : IThriftStream);
strict protected
procedure Open; override;
@@ -318,6 +322,8 @@
procedure Flush; override;
function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
procedure Write( const pBuf : Pointer; off, len : Integer); override;
+
+ procedure UpdateKnownMessageSize(const size : Int64); override;
public
constructor Create( const aInputStream, aOutputStream : IThriftStream; const aConfig : IThriftConfiguration = nil); reintroduce;
destructor Destroy; override;
@@ -340,6 +346,7 @@
procedure Flush; override;
function IsOpen: Boolean; override;
function ToArray: TBytes; override;
+ function CanSeek : Boolean; override;
function Size : Int64; override;
function Position : Int64; override;
public
@@ -545,7 +552,7 @@
then FConfiguration := aConfig
else FConfiguration := TThriftConfigurationImpl.Create;
- ResetConsumedMessageSize;
+ ResetMessageSizeAndConsumedBytes;
end;
@@ -562,7 +569,7 @@
end;
-procedure TEndpointTransportBase.ResetConsumedMessageSize( const newSize : Int64);
+procedure TEndpointTransportBase.ResetMessageSizeAndConsumedBytes( const newSize : Int64);
// Resets RemainingMessageSize to the configured maximum
begin
// full reset
@@ -583,12 +590,12 @@
procedure TEndpointTransportBase.UpdateKnownMessageSize( const size : Int64);
-// Updates RemainingMessageSize to reflect then known real message size (e.g. framed transport).
+// Updates RemainingMessageSize to reflect the known real message size (e.g. framed transport).
// Will throw if we already consumed too many bytes.
var consumed : Int64;
begin
consumed := KnownMessageSize - RemainingMessageSize;
- ResetConsumedMessageSize(size);
+ ResetMessageSizeAndConsumedBytes(size);
CountConsumedMessageBytes(consumed);
end;
@@ -642,9 +649,9 @@
end;
-procedure TLayeredTransportBase<T>.ResetConsumedMessageSize( const knownSize : Int64 = -1);
+procedure TLayeredTransportBase<T>.ResetMessageSizeAndConsumedBytes( const knownSize : Int64 = -1);
begin
- InnerTransport.ResetConsumedMessageSize( knownSize);
+ InnerTransport.ResetMessageSizeAndConsumedBytes( knownSize);
end;
@@ -964,8 +971,8 @@
begin
inherited Close;
- FInputStream := nil;
- FOutputStream := nil;
+ SetInputStream( nil);
+ SetOutputStream( nil);
if FOwnsClient
then FreeAndNil( FClient)
@@ -997,8 +1004,8 @@
FOwnsClient := True;
stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
- FInputStream := stream;
- FOutputStream := stream;
+ SetInputStream( stream);
+ SetOutputStream( stream);
end;
procedure TSocketImpl.Open;
@@ -1026,8 +1033,8 @@
FClient.Open;
{$ENDIF}
- FInputStream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
- FOutputStream := FInputStream;
+ SetInputStream( TTcpSocketStreamImpl.Create( FClient, FTimeout));
+ SetOutputStream( InputStream); // same
end;
{ TBufferedStream }
@@ -1156,6 +1163,12 @@
end;
+function TBufferedStreamImpl.CanSeek : Boolean;
+begin
+ result := TRUE;
+end;
+
+
function TBufferedStreamImpl.Size : Int64;
begin
result := FReadBuffer.Size;
@@ -1173,35 +1186,52 @@
constructor TStreamTransportImpl.Create( const aInputStream, aOutputStream : IThriftStream; const aConfig : IThriftConfiguration);
begin
inherited Create( aConfig);
- FInputStream := aInputStream;
- FOutputStream := aOutputStream;
+ SetInputStream( aInputStream);
+ SetOutputStream( aOutputStream);
end;
destructor TStreamTransportImpl.Destroy;
begin
- FInputStream := nil;
- FOutputStream := nil;
+ SetInputStream( nil);
+ SetInputStream( nil);
inherited;
end;
procedure TStreamTransportImpl.Close;
begin
- FInputStream := nil;
- FOutputStream := nil;
+ SetInputStream( nil);
+ SetInputStream( nil);
end;
procedure TStreamTransportImpl.Flush;
begin
- if FOutputStream = nil then begin
+ if OutputStream = nil then begin
raise TTransportExceptionNotOpen.Create('Cannot flush null outputstream' );
end;
- FOutputStream.Flush;
+ OutputStream.Flush;
end;
function TStreamTransportImpl.GetInputStream: IThriftStream;
begin
- Result := FInputStream;
+ Result := FInternalInputStream;
+end;
+
+procedure TStreamTransportImpl.SetInputStream( const stream : IThriftStream);
+begin
+ FInternalInputStream := stream;
+ ResetMessageSizeAndConsumedBytes(-1); // full reset to configured maximum
+ UpdateKnownMessageSize( -1); // adjust to real stream size
+end;
+
+function TStreamTransportImpl.GetOutputStream: IThriftStream;
+begin
+ Result := FInternalOutputStream;
+end;
+
+procedure TStreamTransportImpl.SetOutputStream( const stream : IThriftStream);
+begin
+ FInternalOutputStream := stream;
end;
function TStreamTransportImpl.GetIsOpen: Boolean;
@@ -1209,11 +1239,6 @@
Result := True;
end;
-function TStreamTransportImpl.GetOutputStream: IThriftStream;
-begin
- Result := FOutputStream;
-end;
-
procedure TStreamTransportImpl.Open;
begin
// nothing to do
@@ -1221,19 +1246,36 @@
function TStreamTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
begin
- if FInputStream = nil
+ if InputStream = nil
then raise TTransportExceptionNotOpen.Create('Cannot read from null inputstream' );
- Result := FInputStream.Read( pBuf,buflen, off, len );
+ Result := InputStream.Read( pBuf,buflen, off, len );
CountConsumedMessageBytes( result);
end;
procedure TStreamTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
begin
- if FOutputStream = nil
+ if OutputStream = nil
then raise TTransportExceptionNotOpen.Create('Cannot write to null outputstream' );
- FOutputStream.Write( pBuf, off, len );
+ OutputStream.Write( pBuf, off, len );
+end;
+
+
+procedure TStreamTransportImpl.UpdateKnownMessageSize(const size : Int64);
+var adjusted : Int64;
+begin
+ if InputStream = nil
+ then adjusted := 0
+ else begin
+ adjusted := MaxMessageSize;
+ if size > 0
+ then adjusted := Math.Min( adjusted, size);
+ if InputStream.CanSeek
+ then adjusted := Math.Min( adjusted, InputStream.Size);
+ end;
+
+ inherited UpdateKnownMessageSize( adjusted);
end;
{ TBufferedTransportImpl }