THRIFT-5012 Centralize configuration aspects into a commonly used configuration object [ci skip]
Client: Delphi
Patch: Jens Geyer
This closes #1955
diff --git a/lib/delphi/src/Thrift.Transport.pas b/lib/delphi/src/Thrift.Transport.pas
index 0a9a39e..af62548 100644
--- a/lib/delphi/src/Thrift.Transport.pas
+++ b/lib/delphi/src/Thrift.Transport.pas
@@ -38,6 +38,7 @@
Thrift.Socket,
{$ENDIF}
{$ENDIF}
+ Thrift.Configuration,
Thrift.Collections,
Thrift.Exception,
Thrift.Utils,
@@ -49,28 +50,10 @@
DEFAULT_THRIFT_TIMEOUT = 5 * 1000; // ms
type
- ITransportControl = interface
- ['{CDA35E2C-F1D2-4BE3-9927-7F1540923265}']
- function MaxAllowedMessageSize : Integer;
- procedure ConsumeReadBytes( const count : Integer);
- procedure ResetConsumedMessageSize;
- end;
-
- TTransportControlImpl = class( TInterfacedObject, ITransportControl)
- strict private
- FMaxAllowedMsgSize : Integer;
- FRemainingMsgSize : Integer;
- strict protected
- // ITransportControl
- function MaxAllowedMessageSize : Integer;
- procedure ConsumeReadBytes( const count : Integer);
- procedure ResetConsumedMessageSize;
- public
- constructor Create( const aMaxMessageSize : Integer = DEFAULT_MAX_MESSAGE_SIZE); reintroduce;
- end;
+ IStreamTransport = interface;
ITransport = interface
- ['{938F6EB5-1848-43D5-8AC4-07633C55B229}']
+ ['{52F81383-F880-492F-8AA7-A66B85B93D6B}']
function GetIsOpen: Boolean;
property IsOpen: Boolean read GetIsOpen;
function Peek: Boolean;
@@ -87,14 +70,14 @@
procedure Write( const pBuf : Pointer; len : Integer); overload;
procedure Flush;
- function TransportControl : ITransportControl;
- procedure CheckReadBytesAvailable( const value : Integer);
+ function Configuration : IThriftConfiguration;
+ function MaxMessageSize : Integer;
+ procedure ResetConsumedMessageSize( const knownSize : Int64 = -1);
+ procedure CheckReadBytesAvailable( const numBytes : Int64);
+ procedure UpdateKnownMessageSize( const size : Int64);
end;
- TTransportImpl = class( TInterfacedObject, ITransport)
- strict private
- FTransportControl : ITransportControl;
-
+ TTransportBase = class abstract( TInterfacedObject)
strict protected
function GetIsOpen: Boolean; virtual; abstract;
property IsOpen: Boolean read GetIsOpen;
@@ -112,12 +95,44 @@
procedure Write( const pBuf : Pointer; off, len : Integer); overload; virtual; abstract;
procedure Flush; virtual;
- function TransportControl : ITransportControl; inline;
- procedure ConsumeReadBytes( const count : Integer); inline;
- procedure CheckReadBytesAvailable( const value : Integer); virtual; abstract;
+ function Configuration : IThriftConfiguration; virtual; abstract;
+ procedure UpdateKnownMessageSize( const size : Int64); virtual; abstract;
+ end;
+ // base class for all endpoint transports, e.g. sockets, pipes or HTTP
+ TEndpointTransportBase = class abstract( TTransportBase, ITransport)
+ strict private
+ FRemainingMessageSize : Int64;
+ FKnownMessageSize : Int64;
+ FConfiguration : IThriftConfiguration;
+ strict protected
+ function Configuration : IThriftConfiguration; override;
+ function MaxMessageSize : Integer;
+ property RemainingMessageSize : Int64 read FRemainingMessageSize;
+ property KnownMessageSize : Int64 read FKnownMessageSize;
+ procedure ResetConsumedMessageSize( const newSize : Int64 = -1); inline;
+ procedure UpdateKnownMessageSize(const size : Int64); override;
+ procedure CheckReadBytesAvailable(const numBytes : Int64); inline;
+ procedure CountConsumedMessageBytes(const numBytes : Int64); inline;
public
- constructor Create( const aTransportCtl : ITransportControl); reintroduce;
+ constructor Create( const aConfig : IThriftConfiguration); reintroduce;
+ end;
+
+ // base class for all layered transports, e.g. framed
+ TLayeredTransportBase<T : ITransport> = class abstract( TTransportBase, ITransport)
+ strict private
+ FTransport : T;
+ strict protected
+ property InnerTransport : T read FTransport;
+ function GetUnderlyingTransport: ITransport;
+ function Configuration : IThriftConfiguration; override;
+ procedure UpdateKnownMessageSize( const size : Int64); override;
+ function MaxMessageSize : Integer; inline;
+ procedure ResetConsumedMessageSize( const knownSize : Int64 = -1); inline;
+ procedure CheckReadBytesAvailable( const numBytes : Int64); virtual;
+ public
+ constructor Create( const aTransport: T); reintroduce;
+ property UnderlyingTransport: ITransport read GetUnderlyingTransport;
end;
TTransportException = class abstract( TException)
@@ -220,17 +235,23 @@
end;
IServerTransport = interface
- ['{C43B87ED-69EA-47C4-B77C-15E288252900}']
+ ['{FA01363F-6B40-482F-971E-4A085535EFC8}']
procedure Listen;
procedure Close;
function Accept( const fnAccepting: TProc): ITransport;
+ function Configuration : IThriftConfiguration;
end;
TServerTransportImpl = class( TInterfacedObject, IServerTransport)
+ strict private
+ FConfig : IThriftConfiguration;
strict protected
+ function Configuration : IThriftConfiguration;
procedure Listen; virtual; abstract;
procedure Close; virtual; abstract;
- function Accept( const fnAccepting: TProc): ITransport; virtual; abstract;
+ function Accept( const fnAccepting: TProc): ITransport; virtual; abstract;
+ public
+ constructor Create( const aConfig : IThriftConfiguration);
end;
ITransportFactory = interface
@@ -238,11 +259,13 @@
function GetTransport( const aTransport: ITransport): ITransport;
end;
- TTransportFactoryImpl = class( TInterfacedObject, ITransportFactory)
+ TTransportFactoryImpl = class ( TInterfacedObject, ITransportFactory)
+ strict protected
function GetTransport( const aTransport: ITransport): ITransport; virtual;
end;
- TTcpSocketStreamImpl = class( TThriftStreamImpl )
+
+ TTcpSocketStreamImpl = class( TThriftStreamImpl)
{$IFDEF OLD_SOCKETS}
strict private type
TWaitForData = ( wfd_HaveData, wfd_Timeout, wfd_Error);
@@ -261,7 +284,6 @@
strict protected
procedure Write( const pBuf : Pointer; offset, count: Integer); override;
function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
- procedure CheckReadBytesAvailable( const value : Integer); override;
procedure Open; override;
procedure Close; override;
procedure Flush; override;
@@ -270,9 +292,9 @@
function ToArray: TBytes; override;
public
{$IFDEF OLD_SOCKETS}
- constructor Create( const aTcpClient: TCustomIpClient; const aTimeout : Integer = 0);
+ constructor Create( const aTcpClient: TCustomIpClient; const aTimeout : Integer = DEFAULT_THRIFT_TIMEOUT);
{$ELSE}
- constructor Create( const aTcpClient: TSocket; const aTimeout : Longword = 0);
+ constructor Create( const aTcpClient: TSocket; const aTimeout : Longword = DEFAULT_THRIFT_TIMEOUT);
{$ENDIF}
end;
@@ -284,7 +306,7 @@
property OutputStream : IThriftStream read GetOutputStream;
end;
- TStreamTransportImpl = class( TTransportImpl, IStreamTransport)
+ TStreamTransportImpl = class( TEndpointTransportBase, IStreamTransport)
strict protected
FInputStream : IThriftStream;
FOutputStream : IThriftStream;
@@ -294,7 +316,6 @@
function GetInputStream: IThriftStream;
function GetOutputStream: IThriftStream;
- procedure CheckReadBytesAvailable( const value : Integer); override;
strict protected
procedure Open; override;
procedure Close; override;
@@ -302,7 +323,7 @@
function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
procedure Write( const pBuf : Pointer; off, len : Integer); override;
public
- constructor Create( const aInputStream, aOutputStream : IThriftStream; const aTransportCtl : ITransportControl = nil);
+ constructor Create( const aInputStream, aOutputStream : IThriftStream; const aConfig : IThriftConfiguration = nil); reintroduce;
destructor Destroy; override;
property InputStream : IThriftStream read GetInputStream;
@@ -318,12 +339,13 @@
strict protected
procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); override;
function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
- procedure CheckReadBytesAvailable( const value : Integer); override;
procedure Open; override;
procedure Close; override;
procedure Flush; override;
function IsOpen: Boolean; override;
function ToArray: TBytes; override;
+ function Size : Int64; override;
+ function Position : Int64; override;
public
constructor Create( const aStream: IThriftStream; const aBufSize : Integer);
destructor Destroy; override;
@@ -340,38 +362,34 @@
{$ENDIF}
FUseBufferedSocket : Boolean;
FOwnsServer : Boolean;
- FTransportControl : ITransportControl;
strict protected
function Accept( const fnAccepting: TProc) : ITransport; override;
- property TransportControl : ITransportControl read FTransportControl;
public
-{$IFDEF OLD_SOCKETS}
- constructor Create( const aServer: TTcpServer; const aClientTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; const aTransportCtl : ITransportControl = nil); overload;
- constructor Create( const aPort: Integer; const aClientTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; aUseBufferedSockets: Boolean = FALSE; const aTransportCtl : ITransportControl = nil); overload;
-{$ELSE}
- constructor Create( const aServer: TServerSocket; const aClientTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; const aTransportCtl : ITransportControl = nil); overload;
- constructor Create( const aPort: Integer; const aClientTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; aUseBufferedSockets: Boolean = FALSE; const aTransportCtl : ITransportControl = nil); overload;
-{$ENDIF}
+ {$IFDEF OLD_SOCKETS}
+ constructor Create( const aServer: TTcpServer; const aClientTimeout : Integer = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload;
+ constructor Create( const aPort: Integer; const aClientTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; aUseBufferedSockets: Boolean = FALSE; const aConfig : IThriftConfiguration = nil); overload;
+ {$ELSE}
+ constructor Create( const aServer: TServerSocket; const aClientTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload;
+ constructor Create( const aPort: Integer; const aClientTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; aUseBufferedSockets: Boolean = FALSE; const aConfig : IThriftConfiguration = nil); overload;
+ {$ENDIF}
+
destructor Destroy; override;
procedure Listen; override;
procedure Close; override;
end;
- TBufferedTransportImpl = class( TTransportImpl )
+ TBufferedTransportImpl = class( TLayeredTransportBase<IStreamTransport>)
strict private
FInputBuffer : IThriftStream;
FOutputBuffer : IThriftStream;
- FTransport : IStreamTransport;
FBufSize : Integer;
procedure InitBuffers;
- function GetUnderlyingTransport: ITransport;
strict protected
function GetIsOpen: Boolean; override;
procedure Flush; override;
- procedure CheckReadBytesAvailable( const value : Integer); override;
public
type
TFactory = class( TTransportFactoryImpl )
@@ -384,7 +402,7 @@
procedure Close(); override;
function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
procedure Write( const pBuf : Pointer; off, len : Integer); override;
- property UnderlyingTransport: ITransport read GetUnderlyingTransport;
+ procedure CheckReadBytesAvailable( const value : Int64); override;
property IsOpen: Boolean read GetIsOpen;
end;
@@ -408,15 +426,16 @@
strict protected
function GetIsOpen: Boolean; override;
public
- procedure Open; override;
{$IFDEF OLD_SOCKETS}
- constructor Create( const aClient : TCustomIpClient; const aOwnsClient : Boolean; const aTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; const aTransportCtl : ITransportControl = nil); overload;
- constructor Create( const aHost: string; const aPort: Integer; const aTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; const aTransportCtl : ITransportControl = nil); overload;
+ constructor Create( const aClient : TCustomIpClient; const aOwnsClient : Boolean; const aTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload;
+ constructor Create( const aHost: string; const aPort: Integer; const aTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload;
{$ELSE}
- constructor Create(const aClient: TSocket; const aOwnsClient: Boolean; const aTransportCtl : ITransportControl = nil); overload;
- constructor Create( const aHost: string; const aPort: Integer; const aTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; const aTransportCtl : ITransportControl = nil); overload;
+ constructor Create(const aClient: TSocket; const aOwnsClient: Boolean; const aConfig : IThriftConfiguration = nil); overload;
+ constructor Create( const aHost: string; const aPort: Integer; const aTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload;
{$ENDIF}
destructor Destroy; override;
+
+ procedure Open; override;
procedure Close; override;
{$IFDEF OLD_SOCKETS}
property TcpClient: TCustomIpClient read FClient;
@@ -427,29 +446,25 @@
property Port: Integer read FPort;
end;
- TFramedTransportImpl = class( TTransportImpl)
- strict protected const
- DEFAULT_MAX_LENGTH = 16384000; // this value is used by all Thrift libraries
+ TFramedTransportImpl = class( TLayeredTransportBase<ITransport>)
strict protected type
TFramedHeader = Int32;
strict protected
- FTransport : ITransport;
FWriteBuffer : TMemoryStream;
FReadBuffer : TMemoryStream;
- FMaxFrameSize : Integer;
- procedure InitMaxFrameSize;
procedure InitWriteBuffer;
procedure ReadFrame;
procedure Open(); override;
- function GetIsOpen: Boolean; override;
+ function GetIsOpen: Boolean; override;
procedure Close(); override;
function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
procedure Write( const pBuf : Pointer; off, len : Integer); override;
+ procedure CheckReadBytesAvailable( const value : Int64); override;
procedure Flush; override;
- procedure CheckReadBytesAvailable( const value : Integer); override;
+
public
type
TFactory = class( TTransportFactoryImpl )
@@ -457,7 +472,6 @@
function GetTransport( const aTransport: ITransport): ITransport; override;
end;
- constructor Create( const aTransportCtl : ITransportControl); overload;
constructor Create( const aTransport: ITransport); overload;
destructor Destroy; override;
end;
@@ -469,80 +483,33 @@
implementation
-{ TTransportControlImpl }
+{ TTransportBase }
-constructor TTransportControlImpl.Create( const aMaxMessageSize : Integer);
-begin
- inherited Create;
-
- if aMaxMessageSize > 0
- then FMaxAllowedMsgSize := aMaxMessageSize
- else FMaxAllowedMsgSize := DEFAULT_MAX_MESSAGE_SIZE;
-
- ResetConsumedMessageSize;
-end;
-
-function TTransportControlImpl.MaxAllowedMessageSize : Integer;
-begin
- result := FMaxAllowedMsgSize;
-end;
-
-procedure TTransportControlImpl.ResetConsumedMessageSize;
-begin
- FRemainingMsgSize := MaxAllowedMessageSize;
-end;
-
-
-procedure TTransportControlImpl.ConsumeReadBytes( const count : Integer);
-begin
- if FRemainingMsgSize >= count
- then Dec( FRemainingMsgSize, count)
- else begin
- FRemainingMsgSize := 0;
- if FRemainingMsgSize < count
- then raise TTransportExceptionEndOfFile.Create('Maximum message size reached');
- end;
-end;
-
-
-{ TTransportImpl }
-
-constructor TTransportImpl.Create( const aTransportCtl : ITransportControl);
-begin
- inherited Create;
-
- if aTransportCtl <> nil
- then FTransportControl := aTransportCtl
- else FTransportControl := TTransportControlImpl.Create;
- ASSERT( FTransportControl <> nil);
-end;
-
-
-procedure TTransportImpl.Flush;
+procedure TTransportBase.Flush;
begin
// nothing to do
end;
-function TTransportImpl.Peek: Boolean;
+function TTransportBase.Peek: Boolean;
begin
Result := IsOpen;
end;
-function TTransportImpl.Read(var buf: TBytes; off: Integer; len: Integer): Integer;
+function TTransportBase.Read(var buf: TBytes; off: Integer; len: Integer): Integer;
begin
if Length(buf) > 0
then result := Read( @buf[0], Length(buf), off, len)
else result := 0;
end;
-function TTransportImpl.ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer;
+function TTransportBase.ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer;
begin
if Length(buf) > 0
then result := ReadAll( @buf[0], Length(buf), off, len)
else result := 0;
end;
-function TTransportImpl.ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
+function TTransportBase.ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
var ret : Integer;
begin
result := 0;
@@ -554,37 +521,144 @@
end;
end;
-procedure TTransportImpl.Write( const buf: TBytes);
+procedure TTransportBase.Write( const buf: TBytes);
begin
if Length(buf) > 0
then Write( @buf[0], 0, Length(buf));
end;
-procedure TTransportImpl.Write( const buf: TBytes; off: Integer; len: Integer);
+procedure TTransportBase.Write( const buf: TBytes; off: Integer; len: Integer);
begin
if Length(buf) > 0
then Write( @buf[0], off, len);
end;
-procedure TTransportImpl.Write( const pBuf : Pointer; len : Integer);
+procedure TTransportBase.Write( const pBuf : Pointer; len : Integer);
begin
Self.Write( pBuf, 0, len);
end;
-function TTransportImpl.TransportControl : ITransportControl;
+{ TEndpointTransportBase }
+
+constructor TEndpointTransportBase.Create( const aConfig : IThriftConfiguration);
begin
- result := FTransportControl;
+ inherited Create;
+
+ if aConfig <> nil
+ then FConfiguration := aConfig
+ else FConfiguration := TThriftConfigurationImpl.Create;
+
+ ResetConsumedMessageSize;
end;
-procedure TTransportImpl.ConsumeReadBytes( const count : Integer);
+function TEndpointTransportBase.Configuration : IThriftConfiguration;
begin
- if FTransportControl <> nil
- then FTransportControl.ConsumeReadBytes( count);
+ result := FConfiguration;
end;
+function TEndpointTransportBase.MaxMessageSize : Integer;
+begin
+ ASSERT( Configuration <> nil);
+ result := Configuration.MaxMessageSize;
+end;
+
+
+procedure TEndpointTransportBase.ResetConsumedMessageSize( const newSize : Int64);
+// Resets RemainingMessageSize to the configured maximum
+begin
+ // full reset
+ if newSize < 0 then begin
+ FKnownMessageSize := MaxMessageSize;
+ FRemainingMessageSize := MaxMessageSize;
+ Exit;
+ end;
+
+ // update only: message size can shrink, but not grow
+ ASSERT( KnownMessageSize <= MaxMessageSize);
+ if newSize > KnownMessageSize
+ then TTransportExceptionEndOfFile.Create('MaxMessageSize reached');
+
+ FKnownMessageSize := newSize;
+ FRemainingMessageSize := newSize;
+end;
+
+
+procedure TEndpointTransportBase.UpdateKnownMessageSize( const size : Int64);
+// Updates RemainingMessageSize to reflect then known real message size (e.g. framed transport).
+// Will throw if we already consumed too many bytes.
+var consumed : Int64;
+begin
+ consumed := KnownMessageSize - RemainingMessageSize;
+ ResetConsumedMessageSize(size);
+ CountConsumedMessageBytes(consumed);
+end;
+
+
+procedure TEndpointTransportBase.CheckReadBytesAvailable( const numBytes : Int64);
+// Throws if there are not enough bytes in the input stream to satisfy a read of numBytes bytes of data
+begin
+ if RemainingMessageSize < numBytes
+ then raise TTransportExceptionEndOfFile.Create('MaxMessageSize reached');
+end;
+
+
+procedure TEndpointTransportBase.CountConsumedMessageBytes( const numBytes : Int64);
+// Consumes numBytes from the RemainingMessageSize.
+begin
+ if (RemainingMessageSize >= numBytes)
+ then Dec( FRemainingMessageSize, numBytes)
+ else begin
+ FRemainingMessageSize := 0;
+ raise TTransportExceptionEndOfFile.Create('MaxMessageSize reached');
+ end;
+end;
+
+{ TLayeredTransportBase }
+
+constructor TLayeredTransportBase<T>.Create( const aTransport: T);
+begin
+ inherited Create;
+ FTransport := aTransport;
+end;
+
+function TLayeredTransportBase<T>.GetUnderlyingTransport: ITransport;
+begin
+ result := InnerTransport;
+end;
+
+function TLayeredTransportBase<T>.Configuration : IThriftConfiguration;
+begin
+ result := InnerTransport.Configuration;
+end;
+
+procedure TLayeredTransportBase<T>.UpdateKnownMessageSize( const size : Int64);
+begin
+ InnerTransport.UpdateKnownMessageSize( size);
+end;
+
+
+function TLayeredTransportBase<T>.MaxMessageSize : Integer;
+begin
+ result := InnerTransport.MaxMessageSize;
+end;
+
+
+procedure TLayeredTransportBase<T>.ResetConsumedMessageSize( const knownSize : Int64 = -1);
+begin
+ InnerTransport.ResetConsumedMessageSize( knownSize);
+end;
+
+
+procedure TLayeredTransportBase<T>.CheckReadBytesAvailable( const numBytes : Int64);
+begin
+ InnerTransport.CheckReadBytesAvailable( numBytes);
+end;
+
+
+
{ TTransportException }
constructor TTransportException.HiddenCreate(const Msg: string);
@@ -676,18 +750,33 @@
Result := aTransport;
end;
+
+{ TServerTransportImpl }
+
+constructor TServerTransportImpl.Create( const aConfig : IThriftConfiguration);
+begin
+ inherited Create;
+ if aConfig <> nil
+ then FConfig := aConfig
+ else FConfig := TThriftConfigurationImpl.Create;
+end;
+
+function TServerTransportImpl.Configuration : IThriftConfiguration;
+begin
+ result := FConfig;
+end;
+
{ TServerSocket }
{$IFDEF OLD_SOCKETS}
-constructor TServerSocketImpl.Create( const aServer: TTcpServer; const aClientTimeout : Integer; const aTransportCtl : ITransportControl);
+constructor TServerSocketImpl.Create( const aServer: TTcpServer; const aClientTimeout : Integer; const aConfig : IThriftConfiguration);
{$ELSE}
-constructor TServerSocketImpl.Create( const aServer: TServerSocket; const aClientTimeout: Longword; const aTransportCtl : ITransportControl);
+constructor TServerSocketImpl.Create( const aServer: TServerSocket; const aClientTimeout: Longword; const aConfig : IThriftConfiguration);
{$ENDIF}
begin
- inherited Create;
+ inherited Create( aConfig);
FServer := aServer;
- FTransportControl := aTransportCtl;
- ASSERT( FTransportControl <> nil);
+
{$IFDEF OLD_SOCKETS}
FClientTimeout := aClientTimeout;
@@ -699,17 +788,12 @@
{$IFDEF OLD_SOCKETS}
-constructor TServerSocketImpl.Create( const aPort: Integer; const aClientTimeout: Integer; aUseBufferedSockets: Boolean; const aTransportCtl : ITransportControl);
+constructor TServerSocketImpl.Create( const aPort: Integer; const aClientTimeout: Integer; aUseBufferedSockets: Boolean; const aConfig : IThriftConfiguration);
{$ELSE}
-constructor TServerSocketImpl.Create( const aPort: Integer; const aClientTimeout: Longword; aUseBufferedSockets: Boolean; const aTransportCtl : ITransportControl);
+constructor TServerSocketImpl.Create( const aPort: Integer; const aClientTimeout: Longword; aUseBufferedSockets: Boolean; const aConfig : IThriftConfiguration);
{$ENDIF}
begin
- inherited Create;
-
- if aTransportCtl <> nil
- then FTransportControl := aTransportCtl
- else FTransportControl := TTransportControlImpl.Create;
- ASSERT( FTransportControl <> nil);
+ inherited Create( aConfig);
{$IFDEF OLD_SOCKETS}
FPort := aPort;
@@ -772,7 +856,7 @@
Exit;
end;
- trans := TSocketImpl.Create( client, TRUE, FClientTimeout, TransportControl);
+ trans := TSocketImpl.Create( client, TRUE, FClientTimeout, Configuration);
client := nil; // trans owns it now
if FUseBufferedSocket
@@ -791,7 +875,7 @@
client := FServer.Accept;
try
- trans := TSocketImpl.Create(client, True, TransportControl);
+ trans := TSocketImpl.Create(client, TRUE, Configuration);
client := nil;
if FUseBufferedSocket then
@@ -840,9 +924,9 @@
{ TSocket }
{$IFDEF OLD_SOCKETS}
-constructor TSocketImpl.Create( const aClient : TCustomIpClient; const aOwnsClient : Boolean; const aTimeout: Integer; const aTransportCtl : ITransportControl);
+constructor TSocketImpl.Create( const aClient : TCustomIpClient; const aOwnsClient : Boolean; const aTimeout: Integer; const aConfig : IThriftConfiguration);
{$ELSE}
-constructor TSocketImpl.Create(const aClient: TSocket; const aOwnsClient: Boolean; const aTransportCtl : ITransportControl);
+constructor TSocketImpl.Create(const aClient: TSocket; const aOwnsClient: Boolean; const aConfig : IThriftConfiguration);
{$ENDIF}
var stream : IThriftStream;
begin
@@ -856,16 +940,17 @@
{$ENDIF}
stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
- inherited Create( stream, stream, aTransportCtl);
+ inherited Create( stream, stream, aConfig);
end;
+
{$IFDEF OLD_SOCKETS}
-constructor TSocketImpl.Create(const aHost: string; const aPort, aTimeout: Integer; const aTransportCtl : ITransportControl);
+constructor TSocketImpl.Create(const aHost: string; const aPort, aTimeout: Integer; const aConfig : IThriftConfiguration);
{$ELSE}
-constructor TSocketImpl.Create(const aHost: string; const aPort : Integer; const aTimeout: Longword; const aTransportCtl : ITransportControl);
+constructor TSocketImpl.Create(const aHost: string; const aPort : Integer; const aTimeout: Longword; const aConfig : IThriftConfiguration);
{$ENDIF}
begin
- inherited Create(nil,nil, aTransportCtl);
+ inherited Create(nil,nil, aConfig);
FHost := aHost;
FPort := aPort;
FTimeout := aTimeout;
@@ -1043,30 +1128,12 @@
end;
-procedure TBufferedStreamImpl.CheckReadBytesAvailable( const value : Integer);
-var nRequired : Integer;
-begin
- nRequired := value;
-
- if FReadBuffer <> nil then begin
- Dec( nRequired, (FReadBuffer.Position - FReadBuffer.Size));
- if nRequired <= 0 then Exit;
- end;
-
- if FStream <> nil
- then FStream.CheckReadBytesAvailable( nRequired)
- else raise TTransportExceptionEndOfFile.Create('Not enough input data');
-end;
-
-
function TBufferedStreamImpl.ToArray: TBytes;
var len : Integer;
begin
- len := 0;
-
- if IsOpen then begin
- len := FReadBuffer.Size;
- end;
+ if IsOpen
+ then len := FReadBuffer.Size
+ else len := 0;
SetLength( Result, len);
@@ -1092,11 +1159,24 @@
end;
end;
+
+function TBufferedStreamImpl.Size : Int64;
+begin
+ result := FReadBuffer.Size;
+end;
+
+
+function TBufferedStreamImpl.Position : Int64;
+begin
+ result := FReadBuffer.Position;
+end;
+
+
{ TStreamTransportImpl }
-constructor TStreamTransportImpl.Create( const aInputStream, aOutputStream : IThriftStream; const aTransportCtl : ITransportControl);
+constructor TStreamTransportImpl.Create( const aInputStream, aOutputStream : IThriftStream; const aConfig : IThriftConfiguration);
begin
- inherited Create( aTransportCtl);
+ inherited Create( aConfig);
FInputStream := aInputStream;
FOutputStream := aOutputStream;
end;
@@ -1149,7 +1229,7 @@
then raise TTransportExceptionNotOpen.Create('Cannot read from null inputstream' );
Result := FInputStream.Read( pBuf,buflen, off, len );
- ConsumeReadBytes( result);
+ CountConsumedMessageBytes( result);
end;
procedure TStreamTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
@@ -1160,28 +1240,19 @@
FOutputStream.Write( pBuf, off, len );
end;
-procedure TStreamTransportImpl.CheckReadBytesAvailable( const value : Integer);
-begin
- if FInputStream <> nil
- then FInputStream.CheckReadBytesAvailable( value)
- else raise TTransportExceptionNotOpen.Create('Cannot read from null inputstream' );
-end;
-
-
{ TBufferedTransportImpl }
constructor TBufferedTransportImpl.Create( const aTransport : IStreamTransport; const aBufSize: Integer);
begin
ASSERT( aTransport <> nil);
- inherited Create( aTransport.TransportControl);
- FTransport := aTransport;
+ inherited Create( aTransport);
FBufSize := aBufSize;
InitBuffers;
end;
procedure TBufferedTransportImpl.Close;
begin
- FTransport.Close;
+ InnerTransport.Close;
FInputBuffer := nil;
FOutputBuffer := nil;
end;
@@ -1195,34 +1266,29 @@
function TBufferedTransportImpl.GetIsOpen: Boolean;
begin
- Result := FTransport.IsOpen;
-end;
-
-function TBufferedTransportImpl.GetUnderlyingTransport: ITransport;
-begin
- Result := FTransport;
+ Result := InnerTransport.IsOpen;
end;
procedure TBufferedTransportImpl.InitBuffers;
begin
- if FTransport.InputStream <> nil then begin
- FInputBuffer := TBufferedStreamImpl.Create( FTransport.InputStream, FBufSize );
+ if InnerTransport.InputStream <> nil then begin
+ FInputBuffer := TBufferedStreamImpl.Create( InnerTransport.InputStream, FBufSize );
end;
- if FTransport.OutputStream <> nil then begin
- FOutputBuffer := TBufferedStreamImpl.Create( FTransport.OutputStream, FBufSize );
+ if InnerTransport.OutputStream <> nil then begin
+ FOutputBuffer := TBufferedStreamImpl.Create( InnerTransport.OutputStream, FBufSize );
end;
end;
procedure TBufferedTransportImpl.Open;
begin
- FTransport.Open;
+ InnerTransport.Open;
InitBuffers; // we need to get the buffers to match FTransport substreams again
end;
function TBufferedTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
begin
if FInputBuffer <> nil
- then Result := FInputBuffer.Read( pBuf,buflen, off, len)
+ then Result := FInputBuffer.Read( pBuf,buflen, off, len )
else Result := 0;
end;
@@ -1233,23 +1299,18 @@
end;
end;
-procedure TBufferedTransportImpl.CheckReadBytesAvailable( const value : Integer);
-var stm2 : IThriftStream2;
- need : Integer;
+procedure TBufferedTransportImpl.CheckReadBytesAvailable( const value : Int64);
+var buffered, need : Int64;
begin
need := value;
// buffered bytes
- if Supports( FInputBuffer, IThriftStream2, stm2) then begin
- Dec( need, stm2.Size - stm2.Position);
- if need <= 0 then Exit;
- end;
-
- if FInputBuffer <> nil
- then FInputBuffer.CheckReadBytesAvailable( need)
- else raise TTransportExceptionNotOpen.Create('Cannot read from null inputstream' );
+ buffered := FInputBuffer.Size - FInputBuffer.Position;
+ if buffered < need
+ then InnerTransport.CheckReadBytesAvailable( need - buffered);
end;
+
{ TBufferedTransportImpl.TFactory }
function TBufferedTransportImpl.TFactory.GetTransport( const aTransport: ITransport): ITransport;
@@ -1260,53 +1321,33 @@
{ TFramedTransportImpl }
-constructor TFramedTransportImpl.Create( const aTransportCtl : ITransportControl);
-begin
- inherited Create( aTransportCtl);
-
- InitMaxFrameSize;
- InitWriteBuffer;
-end;
-
constructor TFramedTransportImpl.Create( const aTransport: ITransport);
begin
ASSERT( aTransport <> nil);
- inherited Create( aTransport.TransportControl);
+ inherited Create( aTransport);
- InitMaxFrameSize;
InitWriteBuffer;
- FTransport := aTransport;
end;
destructor TFramedTransportImpl.Destroy;
begin
FWriteBuffer.Free;
+ FWriteBuffer := nil;
FReadBuffer.Free;
+ FReadBuffer := nil;
inherited;
end;
-procedure TFramedTransportImpl.InitMaxFrameSize;
-var maxLen : Integer;
-begin
- FMaxFrameSize := DEFAULT_MAX_LENGTH;
-
- // MaxAllowedMessageSize may be smaller, but not larger
- if TransportControl <> nil then begin
- maxLen := TransportControl.MaxAllowedMessageSize - SizeOf(TFramedHeader);
- FMaxFrameSize := Min( FMaxFrameSize, maxLen);
- end;
-end;
-
procedure TFramedTransportImpl.Close;
begin
- FTransport.Close;
+ InnerTransport.Close;
end;
procedure TFramedTransportImpl.Flush;
var
buf : TBytes;
len : Integer;
- data_len : Integer;
+ data_len : Int64;
begin
if not IsOpen
then raise TTransportExceptionNotOpen.Create('not open');
@@ -1318,9 +1359,9 @@
end;
data_len := len - SizeOf(TFramedHeader);
- if (data_len < 0) then begin
- raise TTransportExceptionUnknown.Create('TFramedTransport.Flush: data_len < 0' );
- end;
+ if (0 > data_len) or (data_len > Configuration.MaxFrameSize)
+ then raise TTransportExceptionUnknown.Create('TFramedTransport.Flush: invalid frame size ('+IntToStr(data_len)+')')
+ else UpdateKnownMessageSize( len);
InitWriteBuffer;
@@ -1329,13 +1370,13 @@
buf[2] := Byte($FF and (data_len shr 8));
buf[3] := Byte($FF and data_len);
- FTransport.Write( buf, 0, len );
- FTransport.Flush;
+ InnerTransport.Write( buf, 0, len );
+ InnerTransport.Flush;
end;
function TFramedTransportImpl.GetIsOpen: Boolean;
begin
- Result := FTransport.IsOpen;
+ Result := InnerTransport.IsOpen;
end;
type
@@ -1353,7 +1394,7 @@
procedure TFramedTransportImpl.Open;
begin
- FTransport.Open;
+ InnerTransport.Open;
end;
function TFramedTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
@@ -1382,7 +1423,7 @@
size : Integer;
buff : TBytes;
begin
- FTransport.ReadAll( @i32rd[0], SizeOf(i32rd), 0, SizeOf(i32rd));
+ InnerTransport.ReadAll( @i32rd[0], SizeOf(i32rd), 0, SizeOf(i32rd));
size :=
((i32rd[0] and $FF) shl 24) or
((i32rd[1] and $FF) shl 16) or
@@ -1394,14 +1435,15 @@
raise TTransportExceptionCorruptedData.Create('Read a negative frame size ('+IntToStr(size)+')');
end;
- if size > FMaxFrameSize then begin
+ if Int64(size) > Int64(Configuration.MaxFrameSize) then begin
Close();
- raise TTransportExceptionCorruptedData.Create('Frame size ('+IntToStr(size)+') larger than allowed maximum ('+IntToStr(FMaxFrameSize)+')');
+ raise TTransportExceptionCorruptedData.Create('Frame size ('+IntToStr(size)+') larger than allowed maximum ('+IntToStr(Configuration.MaxFrameSize)+')');
end;
- FTransport.CheckReadBytesAvailable( size);
+ UpdateKnownMessageSize(size + SizeOf(size));
+
SetLength( buff, size );
- FTransport.ReadAll( buff, 0, size );
+ InnerTransport.ReadAll( buff, 0, size );
FreeAndNil( FReadBuffer);
FReadBuffer := TMemoryStream.Create;
@@ -1422,15 +1464,15 @@
end;
-procedure TFramedTransportImpl.CheckReadBytesAvailable( const value : Integer);
-var nRemaining : Int64;
+procedure TFramedTransportImpl.CheckReadBytesAvailable( const value : Int64);
+var buffered, need : Int64;
begin
- if FReadBuffer = nil
- then raise TTransportExceptionEndOfFile.Create('Cannot read from null inputstream');
+ need := value;
- nRemaining := FReadBuffer.Size - FReadBuffer.Position;
- if value > nRemaining
- then raise TTransportExceptionEndOfFile.Create('Not enough input data');
+ // buffered bytes
+ buffered := FReadBuffer.Size - FReadBuffer.Position;
+ if buffered < need
+ then InnerTransport.CheckReadBytesAvailable( need - buffered);
end;
@@ -1470,9 +1512,10 @@
procedure TTcpSocketStreamImpl.Flush;
begin
-
+ // nothing to do
end;
+
function TTcpSocketStreamImpl.IsOpen: Boolean;
begin
{$IFDEF OLD_SOCKETS}
@@ -1557,7 +1600,7 @@
{$IFDEF LINUX}
result := Libc.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
{$ENDIF}
-
+
if result = SOCKET_ERROR
then wsaError := WSAGetLastError;
@@ -1638,10 +1681,7 @@
TWaitForData.wfd_Timeout : begin
if (FTimeout = 0)
then Exit
- else begin
- raise TTransportExceptionTimedOut.Create(SysErrorMessage(Cardinal(wsaError)));
-
- end;
+ else raise TTransportExceptionTimedOut.Create(SysErrorMessage(Cardinal(wsaError)));
end;
else
ASSERT( FALSE);
@@ -1764,10 +1804,5 @@
{$ENDIF}
-procedure TTcpSocketStreamImpl.CheckReadBytesAvailable( const value : Integer);
-begin
- // we can't really tell, no further checks possible
-end;
-
end.