THRIFT-5012 Centralize configuration aspects into a commonly used configuration object [ci skip]
Client: Delphi
Patch: Jens Geyer

This closes #1955
diff --git a/lib/delphi/src/Thrift.Transport.pas b/lib/delphi/src/Thrift.Transport.pas
index 0a9a39e..af62548 100644
--- a/lib/delphi/src/Thrift.Transport.pas
+++ b/lib/delphi/src/Thrift.Transport.pas
@@ -38,6 +38,7 @@
       Thrift.Socket,
     {$ENDIF}
   {$ENDIF}
+  Thrift.Configuration,
   Thrift.Collections,
   Thrift.Exception,
   Thrift.Utils,
@@ -49,28 +50,10 @@
   DEFAULT_THRIFT_TIMEOUT = 5 * 1000; // ms
 
 type
-  ITransportControl = interface
-    ['{CDA35E2C-F1D2-4BE3-9927-7F1540923265}']
-    function  MaxAllowedMessageSize : Integer;
-    procedure ConsumeReadBytes( const count : Integer);
-    procedure ResetConsumedMessageSize;
-  end;
-
-  TTransportControlImpl = class( TInterfacedObject, ITransportControl)
-  strict private
-    FMaxAllowedMsgSize : Integer;
-    FRemainingMsgSize : Integer;
-  strict protected
-    // ITransportControl
-    function  MaxAllowedMessageSize : Integer;
-    procedure ConsumeReadBytes( const count : Integer);
-    procedure ResetConsumedMessageSize;
-  public
-    constructor Create( const aMaxMessageSize : Integer = DEFAULT_MAX_MESSAGE_SIZE);  reintroduce;
-  end;
+  IStreamTransport = interface;
 
   ITransport = interface
-    ['{938F6EB5-1848-43D5-8AC4-07633C55B229}']
+    ['{52F81383-F880-492F-8AA7-A66B85B93D6B}']
     function GetIsOpen: Boolean;
     property IsOpen: Boolean read GetIsOpen;
     function Peek: Boolean;
@@ -87,14 +70,14 @@
     procedure Write( const pBuf : Pointer; len : Integer); overload;
     procedure Flush;
 
-    function  TransportControl : ITransportControl;
-    procedure CheckReadBytesAvailable( const value : Integer);
+    function  Configuration : IThriftConfiguration;
+    function  MaxMessageSize : Integer;
+    procedure ResetConsumedMessageSize( const knownSize : Int64 = -1);
+    procedure CheckReadBytesAvailable( const numBytes : Int64);
+    procedure UpdateKnownMessageSize( const size : Int64);
   end;
 
-  TTransportImpl = class( TInterfacedObject, ITransport)
-  strict private
-    FTransportControl : ITransportControl;
-
+  TTransportBase = class abstract( TInterfacedObject)
   strict protected
     function GetIsOpen: Boolean; virtual; abstract;
     property IsOpen: Boolean read GetIsOpen;
@@ -112,12 +95,44 @@
     procedure Write( const pBuf : Pointer; off, len : Integer); overload; virtual; abstract;
     procedure Flush; virtual;
 
-    function  TransportControl : ITransportControl;  inline;
-    procedure ConsumeReadBytes( const count : Integer);  inline;
-    procedure CheckReadBytesAvailable( const value : Integer); virtual; abstract;
+    function  Configuration : IThriftConfiguration; virtual; abstract;
+    procedure UpdateKnownMessageSize( const size : Int64); virtual; abstract;
+  end;
 
+  // base class for all endpoint transports, e.g. sockets, pipes or HTTP
+  TEndpointTransportBase = class abstract( TTransportBase, ITransport)
+  strict private
+    FRemainingMessageSize : Int64;
+    FKnownMessageSize : Int64;
+    FConfiguration : IThriftConfiguration;
+  strict protected
+    function  Configuration : IThriftConfiguration; override;
+    function  MaxMessageSize : Integer;
+    property  RemainingMessageSize : Int64 read FRemainingMessageSize;
+    property  KnownMessageSize : Int64 read FKnownMessageSize;
+    procedure ResetConsumedMessageSize( const newSize : Int64 = -1); inline;
+    procedure UpdateKnownMessageSize(const size : Int64); override;
+    procedure CheckReadBytesAvailable(const numBytes : Int64); inline;
+    procedure CountConsumedMessageBytes(const numBytes : Int64); inline;
   public
-    constructor Create( const aTransportCtl : ITransportControl);  reintroduce;
+    constructor Create( const aConfig : IThriftConfiguration);  reintroduce;
+  end;
+
+  // base class for all layered transports, e.g. framed
+  TLayeredTransportBase<T : ITransport> = class abstract( TTransportBase, ITransport)
+  strict private
+    FTransport : T;
+  strict protected
+    property  InnerTransport : T read FTransport;
+    function  GetUnderlyingTransport: ITransport;
+    function  Configuration : IThriftConfiguration; override;
+    procedure UpdateKnownMessageSize( const size : Int64); override;
+    function  MaxMessageSize : Integer;  inline;
+    procedure ResetConsumedMessageSize( const knownSize : Int64 = -1);  inline;
+    procedure CheckReadBytesAvailable( const numBytes : Int64);   virtual;
+  public
+    constructor Create( const aTransport: T); reintroduce;
+    property UnderlyingTransport: ITransport read GetUnderlyingTransport;
   end;
 
   TTransportException = class abstract( TException)
@@ -220,17 +235,23 @@
   end;
 
   IServerTransport = interface
-    ['{C43B87ED-69EA-47C4-B77C-15E288252900}']
+    ['{FA01363F-6B40-482F-971E-4A085535EFC8}']
     procedure Listen;
     procedure Close;
     function Accept( const fnAccepting: TProc): ITransport;
+    function Configuration : IThriftConfiguration;
   end;
 
   TServerTransportImpl = class( TInterfacedObject, IServerTransport)
+  strict private
+    FConfig : IThriftConfiguration;
   strict protected
+    function  Configuration : IThriftConfiguration;
     procedure Listen; virtual; abstract;
     procedure Close; virtual; abstract;
-    function Accept( const fnAccepting: TProc): ITransport;  virtual; abstract;
+    function  Accept( const fnAccepting: TProc): ITransport;  virtual; abstract;
+  public
+    constructor Create( const aConfig : IThriftConfiguration);
   end;
 
   ITransportFactory = interface
@@ -238,11 +259,13 @@
     function GetTransport( const aTransport: ITransport): ITransport;
   end;
 
-  TTransportFactoryImpl = class( TInterfacedObject, ITransportFactory)
+  TTransportFactoryImpl = class ( TInterfacedObject, ITransportFactory)
+  strict protected
     function GetTransport( const aTransport: ITransport): ITransport; virtual;
   end;
 
-  TTcpSocketStreamImpl = class( TThriftStreamImpl )
+
+  TTcpSocketStreamImpl = class( TThriftStreamImpl)
 {$IFDEF OLD_SOCKETS}
   strict private type
     TWaitForData = ( wfd_HaveData, wfd_Timeout, wfd_Error);
@@ -261,7 +284,6 @@
   strict protected
     procedure Write( const pBuf : Pointer; offset, count: Integer); override;
     function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
-    procedure CheckReadBytesAvailable( const value : Integer);  override;
     procedure Open; override;
     procedure Close; override;
     procedure Flush; override;
@@ -270,9 +292,9 @@
     function ToArray: TBytes; override;
   public
 {$IFDEF OLD_SOCKETS}
-    constructor Create( const aTcpClient: TCustomIpClient; const aTimeout : Integer = 0);
+    constructor Create( const aTcpClient: TCustomIpClient; const aTimeout : Integer = DEFAULT_THRIFT_TIMEOUT);
 {$ELSE}
-    constructor Create( const aTcpClient: TSocket; const aTimeout : Longword = 0);
+    constructor Create( const aTcpClient: TSocket; const aTimeout : Longword = DEFAULT_THRIFT_TIMEOUT);
 {$ENDIF}
   end;
 
@@ -284,7 +306,7 @@
     property OutputStream : IThriftStream read GetOutputStream;
   end;
 
-  TStreamTransportImpl = class( TTransportImpl, IStreamTransport)
+  TStreamTransportImpl = class( TEndpointTransportBase, IStreamTransport)
   strict protected
     FInputStream : IThriftStream;
     FOutputStream : IThriftStream;
@@ -294,7 +316,6 @@
     function GetInputStream: IThriftStream;
     function GetOutputStream: IThriftStream;
 
-    procedure CheckReadBytesAvailable( const value : Integer); override;
   strict protected
     procedure Open; override;
     procedure Close; override;
@@ -302,7 +323,7 @@
     function  Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
     procedure Write( const pBuf : Pointer; off, len : Integer); override;
   public
-    constructor Create( const aInputStream, aOutputStream : IThriftStream; const aTransportCtl : ITransportControl = nil);
+    constructor Create( const aInputStream, aOutputStream : IThriftStream; const aConfig : IThriftConfiguration = nil);  reintroduce;
     destructor Destroy; override;
 
     property InputStream : IThriftStream read GetInputStream;
@@ -318,12 +339,13 @@
   strict protected
     procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); override;
     function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
-    procedure CheckReadBytesAvailable( const value : Integer); override;
     procedure Open;  override;
     procedure Close; override;
     procedure Flush; override;
     function IsOpen: Boolean; override;
     function ToArray: TBytes; override;
+    function Size : Int64; override;
+    function Position : Int64; override;
   public
     constructor Create( const aStream: IThriftStream; const aBufSize : Integer);
     destructor Destroy; override;
@@ -340,38 +362,34 @@
 {$ENDIF}
     FUseBufferedSocket : Boolean;
     FOwnsServer : Boolean;
-    FTransportControl : ITransportControl;
 
   strict protected
     function Accept( const fnAccepting: TProc) : ITransport; override;
-    property TransportControl : ITransportControl read FTransportControl;
 
   public
-{$IFDEF OLD_SOCKETS}
-    constructor Create( const aServer: TTcpServer; const aClientTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; const aTransportCtl : ITransportControl = nil); overload;
-    constructor Create( const aPort: Integer; const aClientTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; aUseBufferedSockets: Boolean = FALSE; const aTransportCtl : ITransportControl = nil); overload;
-{$ELSE}
-    constructor Create( const aServer: TServerSocket; const aClientTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; const aTransportCtl : ITransportControl = nil); overload;
-    constructor Create( const aPort: Integer; const aClientTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; aUseBufferedSockets: Boolean = FALSE; const aTransportCtl : ITransportControl = nil); overload;
-{$ENDIF}
+    {$IFDEF OLD_SOCKETS}
+    constructor Create( const aServer: TTcpServer; const aClientTimeout : Integer = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil);  overload;
+    constructor Create( const aPort: Integer; const aClientTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; aUseBufferedSockets: Boolean = FALSE; const aConfig : IThriftConfiguration = nil);  overload;
+    {$ELSE}
+    constructor Create( const aServer: TServerSocket; const aClientTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil);  overload;
+    constructor Create( const aPort: Integer; const aClientTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; aUseBufferedSockets: Boolean = FALSE; const aConfig : IThriftConfiguration = nil);  overload;
+    {$ENDIF}
+
     destructor Destroy; override;
     procedure Listen; override;
     procedure Close; override;
   end;
 
-  TBufferedTransportImpl = class( TTransportImpl )
+  TBufferedTransportImpl = class( TLayeredTransportBase<IStreamTransport>)
   strict private
     FInputBuffer : IThriftStream;
     FOutputBuffer : IThriftStream;
-    FTransport : IStreamTransport;
     FBufSize : Integer;
 
     procedure InitBuffers;
-    function GetUnderlyingTransport: ITransport;
   strict protected
     function GetIsOpen: Boolean; override;
     procedure Flush; override;
-    procedure CheckReadBytesAvailable( const value : Integer);  override;
   public
     type
       TFactory = class( TTransportFactoryImpl )
@@ -384,7 +402,7 @@
     procedure Close(); override;
     function  Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
     procedure Write( const pBuf : Pointer; off, len : Integer); override;
-    property UnderlyingTransport: ITransport read GetUnderlyingTransport;
+    procedure CheckReadBytesAvailable( const value : Int64); override;
     property IsOpen: Boolean read GetIsOpen;
   end;
 
@@ -408,15 +426,16 @@
   strict protected
     function GetIsOpen: Boolean; override;
   public
-    procedure Open; override;
 {$IFDEF OLD_SOCKETS}
-    constructor Create( const aClient : TCustomIpClient; const aOwnsClient : Boolean; const aTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; const aTransportCtl : ITransportControl = nil); overload;
-    constructor Create( const aHost: string; const aPort: Integer; const aTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; const aTransportCtl : ITransportControl = nil); overload;
+    constructor Create( const aClient : TCustomIpClient; const aOwnsClient : Boolean; const aTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload;
+    constructor Create( const aHost: string; const aPort: Integer; const aTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload;
 {$ELSE}
-    constructor Create(const aClient: TSocket; const aOwnsClient: Boolean; const aTransportCtl : ITransportControl = nil); overload;
-    constructor Create( const aHost: string; const aPort: Integer; const aTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; const aTransportCtl : ITransportControl = nil); overload;
+    constructor Create(const aClient: TSocket; const aOwnsClient: Boolean; const aConfig : IThriftConfiguration = nil); overload;
+    constructor Create( const aHost: string; const aPort: Integer; const aTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload;
 {$ENDIF}
     destructor Destroy; override;
+
+    procedure Open; override;
     procedure Close; override;
 {$IFDEF OLD_SOCKETS}
     property TcpClient: TCustomIpClient read FClient;
@@ -427,29 +446,25 @@
     property Port: Integer read FPort;
   end;
 
-  TFramedTransportImpl = class( TTransportImpl)
-  strict protected const
-    DEFAULT_MAX_LENGTH = 16384000;      // this value is used by all Thrift libraries
+  TFramedTransportImpl = class( TLayeredTransportBase<ITransport>)
   strict protected type
     TFramedHeader = Int32;
   strict protected
-    FTransport : ITransport;
     FWriteBuffer : TMemoryStream;
     FReadBuffer : TMemoryStream;
-    FMaxFrameSize : Integer;
 
-    procedure InitMaxFrameSize;
     procedure InitWriteBuffer;
     procedure ReadFrame;
 
     procedure Open(); override;
-    function GetIsOpen: Boolean; override;
+    function  GetIsOpen: Boolean; override;
 
     procedure Close(); override;
     function  Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
     procedure Write( const pBuf : Pointer; off, len : Integer); override;
+    procedure CheckReadBytesAvailable( const value : Int64);  override;
     procedure Flush; override;
-    procedure CheckReadBytesAvailable( const value : Integer); override;
+
   public
     type
       TFactory = class( TTransportFactoryImpl )
@@ -457,7 +472,6 @@
         function GetTransport( const aTransport: ITransport): ITransport; override;
       end;
 
-    constructor Create( const aTransportCtl : ITransportControl); overload;
     constructor Create( const aTransport: ITransport); overload;
     destructor Destroy; override;
   end;
@@ -469,80 +483,33 @@
 implementation
 
 
-{ TTransportControlImpl }
+{ TTransportBase }
 
-constructor TTransportControlImpl.Create( const aMaxMessageSize : Integer);
-begin
-  inherited Create;
-
-  if aMaxMessageSize > 0
-  then FMaxAllowedMsgSize := aMaxMessageSize
-  else FMaxAllowedMsgSize := DEFAULT_MAX_MESSAGE_SIZE;
-
-  ResetConsumedMessageSize;
-end;
-
-function TTransportControlImpl.MaxAllowedMessageSize : Integer;
-begin
-  result := FMaxAllowedMsgSize;
-end;
-
-procedure TTransportControlImpl.ResetConsumedMessageSize;
-begin
-  FRemainingMsgSize := MaxAllowedMessageSize;
-end;
-
-
-procedure TTransportControlImpl.ConsumeReadBytes( const count : Integer);
-begin
-  if FRemainingMsgSize >= count
-  then Dec( FRemainingMsgSize, count)
-  else begin
-    FRemainingMsgSize := 0;
-    if FRemainingMsgSize < count
-    then raise TTransportExceptionEndOfFile.Create('Maximum message size reached');
-  end;
-end;
-
-
-{ TTransportImpl }
-
-constructor TTransportImpl.Create( const aTransportCtl : ITransportControl);
-begin
-  inherited Create;
-
-  if aTransportCtl <> nil
-  then FTransportControl := aTransportCtl
-  else FTransportControl := TTransportControlImpl.Create;
-  ASSERT( FTransportControl <> nil);
-end;
-
-
-procedure TTransportImpl.Flush;
+procedure TTransportBase.Flush;
 begin
   // nothing to do
 end;
 
-function TTransportImpl.Peek: Boolean;
+function TTransportBase.Peek: Boolean;
 begin
   Result := IsOpen;
 end;
 
-function TTransportImpl.Read(var buf: TBytes; off: Integer; len: Integer): Integer;
+function TTransportBase.Read(var buf: TBytes; off: Integer; len: Integer): Integer;
 begin
   if Length(buf) > 0
   then result := Read( @buf[0], Length(buf), off, len)
   else result := 0;
 end;
 
-function TTransportImpl.ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer;
+function TTransportBase.ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer;
 begin
   if Length(buf) > 0
   then result := ReadAll( @buf[0], Length(buf), off, len)
   else result := 0;
 end;
 
-function TTransportImpl.ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
+function TTransportBase.ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
 var ret : Integer;
 begin
   result := 0;
@@ -554,37 +521,144 @@
   end;
 end;
 
-procedure TTransportImpl.Write( const buf: TBytes);
+procedure TTransportBase.Write( const buf: TBytes);
 begin
   if Length(buf) > 0
   then Write( @buf[0], 0, Length(buf));
 end;
 
-procedure TTransportImpl.Write( const buf: TBytes; off: Integer; len: Integer);
+procedure TTransportBase.Write( const buf: TBytes; off: Integer; len: Integer);
 begin
   if Length(buf) > 0
   then Write( @buf[0], off, len);
 end;
 
-procedure TTransportImpl.Write( const pBuf : Pointer; len : Integer);
+procedure TTransportBase.Write( const pBuf : Pointer; len : Integer);
 begin
   Self.Write( pBuf, 0, len);
 end;
 
 
-function TTransportImpl.TransportControl : ITransportControl;
+{ TEndpointTransportBase }
+
+constructor TEndpointTransportBase.Create( const aConfig : IThriftConfiguration);
 begin
-  result := FTransportControl;
+  inherited Create;
+
+  if aConfig <> nil
+  then FConfiguration := aConfig
+  else FConfiguration := TThriftConfigurationImpl.Create;
+
+  ResetConsumedMessageSize;
 end;
 
 
-procedure TTransportImpl.ConsumeReadBytes( const count : Integer);
+function TEndpointTransportBase.Configuration : IThriftConfiguration;
 begin
-  if FTransportControl <> nil
-  then FTransportControl.ConsumeReadBytes( count);
+  result := FConfiguration;
 end;
 
 
+function TEndpointTransportBase.MaxMessageSize : Integer;
+begin
+  ASSERT( Configuration <> nil);
+  result := Configuration.MaxMessageSize;
+end;
+
+
+procedure TEndpointTransportBase.ResetConsumedMessageSize( const newSize : Int64);
+// Resets RemainingMessageSize to the configured maximum
+begin
+  // full reset
+  if newSize < 0 then begin
+    FKnownMessageSize := MaxMessageSize;
+    FRemainingMessageSize := MaxMessageSize;
+    Exit;
+  end;
+
+  // update only: message size can shrink, but not grow
+  ASSERT( KnownMessageSize <= MaxMessageSize);
+  if newSize > KnownMessageSize
+  then TTransportExceptionEndOfFile.Create('MaxMessageSize reached');
+
+  FKnownMessageSize := newSize;
+  FRemainingMessageSize := newSize;
+end;
+
+
+procedure TEndpointTransportBase.UpdateKnownMessageSize( const size : Int64);
+// Updates RemainingMessageSize to reflect then known real message size (e.g. framed transport).
+// Will throw if we already consumed too many bytes.
+var consumed : Int64;
+begin
+  consumed := KnownMessageSize - RemainingMessageSize;
+  ResetConsumedMessageSize(size);
+  CountConsumedMessageBytes(consumed);
+end;
+
+
+procedure TEndpointTransportBase.CheckReadBytesAvailable( const numBytes : Int64);
+// Throws if there are not enough bytes in the input stream to satisfy a read of numBytes bytes of data
+begin
+  if RemainingMessageSize < numBytes
+  then raise TTransportExceptionEndOfFile.Create('MaxMessageSize reached');
+end;
+
+
+procedure TEndpointTransportBase.CountConsumedMessageBytes( const numBytes : Int64);
+// Consumes numBytes from the RemainingMessageSize.
+begin
+  if (RemainingMessageSize >= numBytes)
+  then Dec( FRemainingMessageSize, numBytes)
+  else begin
+    FRemainingMessageSize := 0;
+    raise TTransportExceptionEndOfFile.Create('MaxMessageSize reached');
+  end;
+end;
+
+{ TLayeredTransportBase }
+
+constructor TLayeredTransportBase<T>.Create( const aTransport: T);
+begin
+  inherited Create;
+  FTransport := aTransport;
+end;
+
+function TLayeredTransportBase<T>.GetUnderlyingTransport: ITransport;
+begin
+  result := InnerTransport;
+end;
+
+function TLayeredTransportBase<T>.Configuration : IThriftConfiguration;
+begin
+  result := InnerTransport.Configuration;
+end;
+
+procedure TLayeredTransportBase<T>.UpdateKnownMessageSize( const size : Int64);
+begin
+  InnerTransport.UpdateKnownMessageSize( size);
+end;
+
+
+function TLayeredTransportBase<T>.MaxMessageSize : Integer;
+begin
+  result := InnerTransport.MaxMessageSize;
+end;
+
+
+procedure TLayeredTransportBase<T>.ResetConsumedMessageSize( const knownSize : Int64 = -1);
+begin
+  InnerTransport.ResetConsumedMessageSize( knownSize);
+end;
+
+
+procedure TLayeredTransportBase<T>.CheckReadBytesAvailable( const numBytes : Int64);
+begin
+  InnerTransport.CheckReadBytesAvailable( numBytes);
+end;
+
+
+
 { TTransportException }
 
 constructor TTransportException.HiddenCreate(const Msg: string);
@@ -676,18 +750,33 @@
   Result := aTransport;
 end;
 
+
+{ TServerTransportImpl }
+
+constructor TServerTransportImpl.Create( const aConfig : IThriftConfiguration);
+begin
+  inherited Create;
+  if aConfig <> nil
+  then FConfig := aConfig
+  else FConfig := TThriftConfigurationImpl.Create;
+end;
+
+function TServerTransportImpl.Configuration : IThriftConfiguration;
+begin
+  result := FConfig;
+end;
+
 { TServerSocket }
 
 {$IFDEF OLD_SOCKETS}
-constructor TServerSocketImpl.Create( const aServer: TTcpServer; const aClientTimeout : Integer; const aTransportCtl : ITransportControl);
+constructor TServerSocketImpl.Create( const aServer: TTcpServer; const aClientTimeout : Integer; const aConfig : IThriftConfiguration);
 {$ELSE}
-constructor TServerSocketImpl.Create( const aServer: TServerSocket; const aClientTimeout: Longword; const aTransportCtl : ITransportControl);
+constructor TServerSocketImpl.Create( const aServer: TServerSocket; const aClientTimeout: Longword; const aConfig : IThriftConfiguration);
 {$ENDIF}
 begin
-  inherited Create;
+  inherited Create( aConfig);
   FServer := aServer;
-  FTransportControl := aTransportCtl;
-  ASSERT( FTransportControl <> nil);
+
 
 {$IFDEF OLD_SOCKETS}
   FClientTimeout := aClientTimeout;
@@ -699,17 +788,12 @@
 
 
 {$IFDEF OLD_SOCKETS}
-constructor TServerSocketImpl.Create( const aPort: Integer; const aClientTimeout: Integer; aUseBufferedSockets: Boolean; const aTransportCtl : ITransportControl);
+constructor TServerSocketImpl.Create( const aPort: Integer; const aClientTimeout: Integer; aUseBufferedSockets: Boolean; const aConfig : IThriftConfiguration);
 {$ELSE}
-constructor TServerSocketImpl.Create( const aPort: Integer; const aClientTimeout: Longword; aUseBufferedSockets: Boolean; const aTransportCtl : ITransportControl);
+constructor TServerSocketImpl.Create( const aPort: Integer; const aClientTimeout: Longword; aUseBufferedSockets: Boolean; const aConfig : IThriftConfiguration);
 {$ENDIF}
 begin
-  inherited Create;
-
-  if aTransportCtl <> nil
-  then FTransportControl := aTransportCtl
-  else FTransportControl := TTransportControlImpl.Create;
-  ASSERT( FTransportControl <> nil);
+  inherited Create( aConfig);
 
 {$IFDEF OLD_SOCKETS}
   FPort := aPort;
@@ -772,7 +856,7 @@
       Exit;
     end;
 
-    trans := TSocketImpl.Create( client, TRUE, FClientTimeout, TransportControl);
+    trans := TSocketImpl.Create( client, TRUE, FClientTimeout, Configuration);
     client := nil;  // trans owns it now
 
     if FUseBufferedSocket
@@ -791,7 +875,7 @@
 
   client := FServer.Accept;
   try
-    trans := TSocketImpl.Create(client, True, TransportControl);
+    trans := TSocketImpl.Create(client, TRUE, Configuration);
     client := nil;
 
     if FUseBufferedSocket then
@@ -840,9 +924,9 @@
 { TSocket }
 
 {$IFDEF OLD_SOCKETS}
-constructor TSocketImpl.Create( const aClient : TCustomIpClient; const aOwnsClient : Boolean; const aTimeout: Integer; const aTransportCtl : ITransportControl);
+constructor TSocketImpl.Create( const aClient : TCustomIpClient; const aOwnsClient : Boolean; const aTimeout: Integer; const aConfig : IThriftConfiguration);
 {$ELSE}
-constructor TSocketImpl.Create(const aClient: TSocket; const aOwnsClient: Boolean; const aTransportCtl : ITransportControl);
+constructor TSocketImpl.Create(const aClient: TSocket; const aOwnsClient: Boolean; const aConfig : IThriftConfiguration);
 {$ENDIF}
 var stream : IThriftStream;
 begin
@@ -856,16 +940,17 @@
 {$ENDIF}
 
   stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
-  inherited Create( stream, stream, aTransportCtl);
+  inherited Create( stream, stream, aConfig);
 end;
 
+
 {$IFDEF OLD_SOCKETS}
-constructor TSocketImpl.Create(const aHost: string; const aPort, aTimeout: Integer; const aTransportCtl : ITransportControl);
+constructor TSocketImpl.Create(const aHost: string; const aPort, aTimeout: Integer; const aConfig : IThriftConfiguration);
 {$ELSE}
-constructor TSocketImpl.Create(const aHost: string; const aPort : Integer; const aTimeout: Longword; const aTransportCtl : ITransportControl);
+constructor TSocketImpl.Create(const aHost: string; const aPort : Integer; const aTimeout: Longword; const aConfig : IThriftConfiguration);
 {$ENDIF}
 begin
-  inherited Create(nil,nil, aTransportCtl);
+  inherited Create(nil,nil, aConfig);
   FHost := aHost;
   FPort := aPort;
   FTimeout := aTimeout;
@@ -1043,30 +1128,12 @@
 end;
 
 
-procedure TBufferedStreamImpl.CheckReadBytesAvailable( const value : Integer);
-var nRequired : Integer;
-begin
-  nRequired := value;
-
-  if FReadBuffer <> nil then begin
-    Dec( nRequired, (FReadBuffer.Position - FReadBuffer.Size));
-    if nRequired <= 0 then Exit;
-  end;
-
-  if FStream <> nil
-  then FStream.CheckReadBytesAvailable( nRequired)
-  else raise TTransportExceptionEndOfFile.Create('Not enough input data');
-end;
-
-
 function TBufferedStreamImpl.ToArray: TBytes;
 var len : Integer;
 begin
-  len := 0;
-
-  if IsOpen then begin
-    len := FReadBuffer.Size;
-  end;
+  if IsOpen
+  then len := FReadBuffer.Size
+  else len := 0;
 
   SetLength( Result, len);
 
@@ -1092,11 +1159,24 @@
   end;
 end;
 
+
+function TBufferedStreamImpl.Size : Int64;
+begin
+  result := FReadBuffer.Size;
+end;
+
+
+function TBufferedStreamImpl.Position : Int64;
+begin
+  result := FReadBuffer.Position;
+end;
+
+
 { TStreamTransportImpl }
 
-constructor TStreamTransportImpl.Create( const aInputStream, aOutputStream : IThriftStream; const aTransportCtl : ITransportControl);
+constructor TStreamTransportImpl.Create( const aInputStream, aOutputStream : IThriftStream; const aConfig : IThriftConfiguration);
 begin
-  inherited Create( aTransportCtl);
+  inherited Create( aConfig);
   FInputStream := aInputStream;
   FOutputStream := aOutputStream;
 end;
@@ -1149,7 +1229,7 @@
   then raise TTransportExceptionNotOpen.Create('Cannot read from null inputstream' );
 
   Result := FInputStream.Read( pBuf,buflen, off, len );
-  ConsumeReadBytes( result);
+  CountConsumedMessageBytes( result);
 end;
 
 procedure TStreamTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
@@ -1160,28 +1240,19 @@
   FOutputStream.Write( pBuf, off, len );
 end;
 
-procedure TStreamTransportImpl.CheckReadBytesAvailable( const value : Integer);
-begin
-  if FInputStream <> nil
-  then FInputStream.CheckReadBytesAvailable( value)
-  else raise TTransportExceptionNotOpen.Create('Cannot read from null inputstream' );
-end;
-
-
 { TBufferedTransportImpl }
 
 constructor TBufferedTransportImpl.Create( const aTransport : IStreamTransport; const aBufSize: Integer);
 begin
   ASSERT( aTransport <> nil);
-  inherited Create( aTransport.TransportControl);
-  FTransport := aTransport;
+  inherited Create( aTransport);
   FBufSize := aBufSize;
   InitBuffers;
 end;
 
 procedure TBufferedTransportImpl.Close;
 begin
-  FTransport.Close;
+  InnerTransport.Close;
   FInputBuffer := nil;
   FOutputBuffer := nil;
 end;
@@ -1195,34 +1266,29 @@
 
 function TBufferedTransportImpl.GetIsOpen: Boolean;
 begin
-  Result := FTransport.IsOpen;
-end;
-
-function TBufferedTransportImpl.GetUnderlyingTransport: ITransport;
-begin
-  Result := FTransport;
+  Result := InnerTransport.IsOpen;
 end;
 
 procedure TBufferedTransportImpl.InitBuffers;
 begin
-  if FTransport.InputStream <> nil then begin
-    FInputBuffer := TBufferedStreamImpl.Create( FTransport.InputStream, FBufSize );
+  if InnerTransport.InputStream <> nil then begin
+    FInputBuffer := TBufferedStreamImpl.Create( InnerTransport.InputStream, FBufSize );
   end;
-  if FTransport.OutputStream <> nil then begin
-    FOutputBuffer := TBufferedStreamImpl.Create( FTransport.OutputStream, FBufSize );
+  if InnerTransport.OutputStream <> nil then begin
+    FOutputBuffer := TBufferedStreamImpl.Create( InnerTransport.OutputStream, FBufSize );
   end;
 end;
 
 procedure TBufferedTransportImpl.Open;
 begin
-  FTransport.Open;
+  InnerTransport.Open;
   InitBuffers;  // we need to get the buffers to match FTransport substreams again
 end;
 
 function TBufferedTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
 begin
   if FInputBuffer <> nil
-  then Result := FInputBuffer.Read( pBuf,buflen, off, len)
+  then Result := FInputBuffer.Read( pBuf,buflen, off, len )
   else Result := 0;
 end;
 
@@ -1233,23 +1299,18 @@
   end;
 end;
 
-procedure TBufferedTransportImpl.CheckReadBytesAvailable( const value : Integer);
-var stm2 : IThriftStream2;
-    need : Integer;
+procedure TBufferedTransportImpl.CheckReadBytesAvailable( const value : Int64);
+var buffered, need : Int64;
 begin
   need := value;
 
   // buffered bytes
-  if Supports( FInputBuffer, IThriftStream2, stm2) then begin
-    Dec( need, stm2.Size - stm2.Position);
-    if need <= 0 then Exit;
-  end;
-
-  if FInputBuffer <> nil
-  then FInputBuffer.CheckReadBytesAvailable( need)
-  else raise TTransportExceptionNotOpen.Create('Cannot read from null inputstream' );
+  buffered := FInputBuffer.Size - FInputBuffer.Position;
+  if buffered < need
+  then InnerTransport.CheckReadBytesAvailable( need - buffered);
 end;
 
+
 { TBufferedTransportImpl.TFactory }
 
 function TBufferedTransportImpl.TFactory.GetTransport( const aTransport: ITransport): ITransport;
@@ -1260,53 +1321,33 @@
 
 { TFramedTransportImpl }
 
-constructor TFramedTransportImpl.Create( const aTransportCtl : ITransportControl);
-begin
-  inherited Create( aTransportCtl);
-
-  InitMaxFrameSize;
-  InitWriteBuffer;
-end;
-
 constructor TFramedTransportImpl.Create( const aTransport: ITransport);
 begin
   ASSERT( aTransport <> nil);
-  inherited Create( aTransport.TransportControl);
+  inherited Create( aTransport);
 
-  InitMaxFrameSize;
   InitWriteBuffer;
-  FTransport := aTransport;
 end;
 
 destructor TFramedTransportImpl.Destroy;
 begin
   FWriteBuffer.Free;
+  FWriteBuffer := nil;
   FReadBuffer.Free;
+  FReadBuffer := nil;
   inherited;
 end;
 
-procedure TFramedTransportImpl.InitMaxFrameSize;
-var maxLen : Integer;
-begin
-  FMaxFrameSize := DEFAULT_MAX_LENGTH;
-
-  // MaxAllowedMessageSize may be smaller, but not larger
-  if TransportControl <> nil then begin
-    maxLen := TransportControl.MaxAllowedMessageSize - SizeOf(TFramedHeader);
-    FMaxFrameSize := Min( FMaxFrameSize, maxLen);
-  end;
-end;
-
 procedure TFramedTransportImpl.Close;
 begin
-  FTransport.Close;
+  InnerTransport.Close;
 end;
 
 procedure TFramedTransportImpl.Flush;
 var
   buf : TBytes;
   len : Integer;
-  data_len : Integer;
+  data_len : Int64;
 begin
   if not IsOpen
   then raise TTransportExceptionNotOpen.Create('not open');
@@ -1318,9 +1359,9 @@
   end;
 
   data_len := len - SizeOf(TFramedHeader);
-  if (data_len < 0) then begin
-    raise TTransportExceptionUnknown.Create('TFramedTransport.Flush: data_len < 0' );
-  end;
+  if (0 > data_len) or (data_len > Configuration.MaxFrameSize)
+  then raise TTransportExceptionUnknown.Create('TFramedTransport.Flush: invalid frame size ('+IntToStr(data_len)+')')
+  else UpdateKnownMessageSize( len);
 
   InitWriteBuffer;
 
@@ -1329,13 +1370,13 @@
   buf[2] := Byte($FF and (data_len shr 8));
   buf[3] := Byte($FF and data_len);
 
-  FTransport.Write( buf, 0, len );
-  FTransport.Flush;
+  InnerTransport.Write( buf, 0, len );
+  InnerTransport.Flush;
 end;
 
 function TFramedTransportImpl.GetIsOpen: Boolean;
 begin
-  Result := FTransport.IsOpen;
+  Result := InnerTransport.IsOpen;
 end;
 
 type
@@ -1353,7 +1394,7 @@
 
 procedure TFramedTransportImpl.Open;
 begin
-  FTransport.Open;
+  InnerTransport.Open;
 end;
 
 function TFramedTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
@@ -1382,7 +1423,7 @@
   size : Integer;
   buff : TBytes;
 begin
-  FTransport.ReadAll( @i32rd[0], SizeOf(i32rd), 0, SizeOf(i32rd));
+  InnerTransport.ReadAll( @i32rd[0], SizeOf(i32rd), 0, SizeOf(i32rd));
   size :=
     ((i32rd[0] and $FF) shl 24) or
     ((i32rd[1] and $FF) shl 16) or
@@ -1394,14 +1435,15 @@
     raise TTransportExceptionCorruptedData.Create('Read a negative frame size ('+IntToStr(size)+')');
   end;
 
-  if size > FMaxFrameSize then begin
+  if Int64(size) > Int64(Configuration.MaxFrameSize) then begin
     Close();
-    raise TTransportExceptionCorruptedData.Create('Frame size ('+IntToStr(size)+') larger than allowed maximum ('+IntToStr(FMaxFrameSize)+')');
+    raise TTransportExceptionCorruptedData.Create('Frame size ('+IntToStr(size)+') larger than allowed maximum ('+IntToStr(Configuration.MaxFrameSize)+')');
   end;
 
-  FTransport.CheckReadBytesAvailable( size);
+  UpdateKnownMessageSize(size + SizeOf(size));
+
   SetLength( buff, size );
-  FTransport.ReadAll( buff, 0, size );
+  InnerTransport.ReadAll( buff, 0, size );
 
   FreeAndNil( FReadBuffer);
   FReadBuffer := TMemoryStream.Create;
@@ -1422,15 +1464,15 @@
 end;
 
 
-procedure TFramedTransportImpl.CheckReadBytesAvailable( const value : Integer);
-var nRemaining : Int64;
+procedure TFramedTransportImpl.CheckReadBytesAvailable( const value : Int64);
+var buffered, need : Int64;
 begin
-  if FReadBuffer = nil
-  then raise TTransportExceptionEndOfFile.Create('Cannot read from null inputstream');
+  need := value;
 
-  nRemaining := FReadBuffer.Size - FReadBuffer.Position;
-  if value > nRemaining
-  then raise TTransportExceptionEndOfFile.Create('Not enough input data');
+  // buffered bytes
+  buffered := FReadBuffer.Size - FReadBuffer.Position;
+  if buffered < need
+  then InnerTransport.CheckReadBytesAvailable( need - buffered);
 end;
 
 
@@ -1470,9 +1512,10 @@
 
 procedure TTcpSocketStreamImpl.Flush;
 begin
-
+  // nothing to do
 end;
 
+
 function TTcpSocketStreamImpl.IsOpen: Boolean;
 begin
 {$IFDEF OLD_SOCKETS}
@@ -1557,7 +1600,7 @@
     {$IFDEF LINUX}
       result := Libc.select(           socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
     {$ENDIF}
-	
+
     if result = SOCKET_ERROR
     then wsaError := WSAGetLastError;
 
@@ -1638,10 +1681,7 @@
         TWaitForData.wfd_Timeout  :  begin
           if (FTimeout = 0)
           then Exit
-          else begin
-            raise TTransportExceptionTimedOut.Create(SysErrorMessage(Cardinal(wsaError)));
-
-          end;
+          else raise TTransportExceptionTimedOut.Create(SysErrorMessage(Cardinal(wsaError)));
         end;
       else
         ASSERT( FALSE);
@@ -1764,10 +1804,5 @@
 
 {$ENDIF}
 
-procedure TTcpSocketStreamImpl.CheckReadBytesAvailable( const value : Integer);
-begin
-  // we can't really tell, no further checks possible
-end;
-
 
 end.