THRIFT-5007 Implement MAX_MESSAGE_SIZE and remaining read bytes control
Client: Delphi
Patch: Jens Geyer

This closes #1932
diff --git a/lib/delphi/src/Thrift.Processor.Multiplex.pas b/lib/delphi/src/Thrift.Processor.Multiplex.pas
index 622f730..ba77d94 100644
--- a/lib/delphi/src/Thrift.Processor.Multiplex.pas
+++ b/lib/delphi/src/Thrift.Processor.Multiplex.pas
@@ -132,6 +132,7 @@
 
 function TMultiplexedProcessorImpl.TStoredMessageProtocol.ReadMessageBegin: TThriftMessage;
 begin
+  Reset;
   result := FMessageBegin;
 end;
 
diff --git a/lib/delphi/src/Thrift.Protocol.Compact.pas b/lib/delphi/src/Thrift.Protocol.Compact.pas
index 866bd26..109e660 100644
--- a/lib/delphi/src/Thrift.Protocol.Compact.pas
+++ b/lib/delphi/src/Thrift.Protocol.Compact.pas
@@ -133,8 +133,6 @@
     constructor Create(const trans : ITransport);
     destructor Destroy;  override;
 
-    procedure Reset;
-
   strict private
     procedure WriteByteDirect( const b : Byte);  overload;
 
@@ -172,7 +170,7 @@
     procedure WriteDouble( const dub: Double); override;
     procedure WriteBinary( const b: TBytes); overload; override;
 
-  private
+  private  // unit visible stuff
     class function  DoubleToInt64Bits( const db : Double) : Int64;
     class function  Int64BitsToDouble( const i64 : Int64) : Double;
 
@@ -193,6 +191,10 @@
     //Convert a Int64 into little-endian bytes in buf starting at off and going until off+7.
     class procedure fixedLongToBytes( const n : Int64; var buf : TBytes);
 
+  strict protected
+    function GetMinSerializedSize( const aType : TType) : Integer;  override;
+    procedure Reset;  override;
+
   public
     function  ReadMessageBegin: TThriftMessage; override;
     procedure ReadMessageEnd(); override;
@@ -291,6 +293,7 @@
 
 procedure TCompactProtocolImpl.Reset;
 begin
+  inherited Reset;
   lastField_.Clear();
   lastFieldId_ := 0;
   Init( booleanField_, '', TType.Stop, 0);
@@ -735,6 +738,7 @@
   val := getTType( Byte( keyAndValueType and $F));
   Init( result, key, val, size);
   ASSERT( (result.KeyType = key) and (result.ValueType = val));
+  CheckReadBytesAvailable(result);
 end;
 
 
@@ -755,6 +759,7 @@
 
   type_ := getTType( size_and_type);
   Init( result, type_, size);
+  CheckReadBytesAvailable(result);
 end;
 
 
@@ -775,6 +780,7 @@
 
   type_ := getTType( size_and_type);
   Init( result, type_, size);
+  CheckReadBytesAvailable(result);
 end;
 
 
@@ -836,6 +842,7 @@
 var length : Integer;
 begin
   length := Integer( ReadVarint32);
+  FTrans.CheckReadBytesAvailable(length);
   SetLength( result, length);
   if (length > 0)
   then Transport.ReadAll( result, 0, length);
@@ -968,6 +975,32 @@
 end;
 
 
+function TCompactProtocolImpl.GetMinSerializedSize( const aType : TType) : Integer;
+// Return the minimum number of bytes a type will consume on the wire
+begin
+  case aType of
+    TType.Stop:    result := 0;
+    TType.Void:    result := 0;
+    TType.Bool_:   result := SizeOf(Byte);
+    TType.Byte_:   result := SizeOf(Byte);
+    TType.Double_: result := 8;  // uses fixedLongToBytes() which always writes 8 bytes
+    TType.I16:     result := SizeOf(Byte);
+    TType.I32:     result := SizeOf(Byte);
+    TType.I64:     result := SizeOf(Byte);
+    TType.String_: result := SizeOf(Byte);  // string length
+    TType.Struct:  result := 0;             // empty struct
+    TType.Map:     result := SizeOf(Byte);  // element count
+    TType.Set_:    result := SizeOf(Byte);  // element count
+    TType.List:    result := SizeOf(Byte);  // element count
+  else
+    raise TTransportExceptionBadArgs.Create('Unhandled type code');
+  end;
+end;
+
+
+
+
+
 //--- unit tests -------------------------------------------
 
 {$IFDEF Debug}
diff --git a/lib/delphi/src/Thrift.Protocol.JSON.pas b/lib/delphi/src/Thrift.Protocol.JSON.pas
index 85cb973..e72a81d 100644
--- a/lib/delphi/src/Thrift.Protocol.JSON.pas
+++ b/lib/delphi/src/Thrift.Protocol.JSON.pas
@@ -132,6 +132,10 @@
     procedure PushContext( const aCtx : TJSONBaseContext);
     procedure PopContext;
 
+  strict protected
+    function  GetMinSerializedSize( const aType : TType) : Integer;  override;
+    procedure Reset;  override;
+
   public
     // TJSONProtocolImpl Constructor
     constructor Create( const aTrans : ITransport);
@@ -480,6 +484,13 @@
 end;
 
 
+procedure TJSONProtocolImpl.Reset;
+begin
+  inherited Reset;
+  ResetContextStack;
+end;
+
+
 procedure TJSONProtocolImpl.ResetContextStack;
 begin
   while FContextStack.Count > 0
@@ -683,6 +694,7 @@
 
 procedure TJSONProtocolImpl.WriteMessageBegin( const aMsg : TThriftMessage);
 begin
+  Reset;
   ResetContextStack;  // THRIFT-1473
 
   WriteJSONArrayStart;
@@ -1053,6 +1065,7 @@
 
 function TJSONProtocolImpl.ReadMessageBegin: TThriftMessage;
 begin
+  Reset;
   ResetContextStack;  // THRIFT-1473
 
   Init( result);
@@ -1123,6 +1136,8 @@
   result.ValueType := GetTypeIDForTypeName( str);
 
   result.Count := ReadJSONInteger;
+  CheckReadBytesAvailable(result);
+
   ReadJSONObjectStart;
 end;
 
@@ -1143,6 +1158,7 @@
   str := SysUtils.TEncoding.UTF8.GetString( ReadJSONString(FALSE));
   result.ElementType := GetTypeIDForTypeName( str);
   result.Count := ReadJSONInteger;
+  CheckReadBytesAvailable(result);
 end;
 
 
@@ -1161,6 +1177,7 @@
   str := SysUtils.TEncoding.UTF8.GetString( ReadJSONString(FALSE));
   result.ElementType := GetTypeIDForTypeName( str);
   result.Count := ReadJSONInteger;
+  CheckReadBytesAvailable(result);
 end;
 
 
@@ -1218,6 +1235,30 @@
 end;
 
 
+function TJSONProtocolImpl.GetMinSerializedSize( const aType : TType) : Integer;
+// Return the minimum number of bytes a type will consume on the wire
+begin
+  case aType of
+    TType.Stop:    result := 0;
+    TType.Void:    result := 0;
+    TType.Bool_:   result := 1;
+    TType.Byte_:   result := 1;
+    TType.Double_: result := 1;
+    TType.I16:     result := 1;
+    TType.I32:     result := 1;
+    TType.I64:     result := 1;
+    TType.String_: result := 2;  // empty string
+    TType.Struct:  result := 2;  // empty struct
+    TType.Map:     result := 2;  // empty map
+    TType.Set_:    result := 2;  // empty set
+    TType.List:    result := 2;  // empty list
+  else
+    raise TTransportExceptionBadArgs.Create('Unhandled type code');
+  end;
+end;
+
+
+
 //--- init code ---
 
 procedure InitBytes( var b : TBytes; aData : array of Byte);
diff --git a/lib/delphi/src/Thrift.Protocol.pas b/lib/delphi/src/Thrift.Protocol.pas
index 7c80221..94e6e18 100644
--- a/lib/delphi/src/Thrift.Protocol.pas
+++ b/lib/delphi/src/Thrift.Protocol.pas
@@ -196,7 +196,7 @@
   end;
 
   IProtocol = interface
-    ['{602A7FFB-0D9E-4CD8-8D7F-E5076660588A}']
+    ['{7F3640D7-5082-49E7-B562-84202F323C3A}']
     function GetTransport: ITransport;
     procedure WriteMessageBegin( const msg: TThriftMessage);
     procedure WriteMessageEnd;
@@ -248,6 +248,7 @@
     function  NextRecursionLevel : IProtocolRecursionTracker;
     procedure IncrementRecursionDepth;
     procedure DecrementRecursionDepth;
+    function  GetMinSerializedSize( const aType : TType) : Integer;
 
     property Transport: ITransport read GetTransport;
     property RecursionLimit : Integer read GetRecursionLimit write SetRecursionLimit;
@@ -265,6 +266,12 @@
     procedure IncrementRecursionDepth;
     procedure DecrementRecursionDepth;
 
+    function  GetMinSerializedSize( const aType : TType) : Integer;  virtual; abstract;
+    procedure CheckReadBytesAvailable( const value : TThriftList);  overload; inline;
+    procedure CheckReadBytesAvailable( const value : TThriftSet);  overload; inline;
+    procedure CheckReadBytesAvailable( const value : TThriftMap);  overload; inline;
+
+    procedure Reset;  virtual;
     function GetTransport: ITransport;
   public
     procedure WriteMessageBegin( const msg: TThriftMessage); virtual; abstract;
@@ -332,6 +339,7 @@
   strict protected
     FStrictRead : Boolean;
     FStrictWrite : Boolean;
+    function GetMinSerializedSize( const aType : TType) : Integer;  override;
 
   strict private
     function ReadAll( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer ): Integer;  inline;
@@ -404,6 +412,9 @@
   strict private
     FWrappedProtocol : IProtocol;
 
+  strict protected
+    function GetMinSerializedSize( const aType : TType) : Integer;  override;
+
   public
     // Encloses the specified protocol.
     // All operations will be forward to the given protocol.  Must be non-null.
@@ -583,6 +594,12 @@
   Result := FTrans;
 end;
 
+procedure TProtocolImpl.Reset;
+begin
+  if FTrans.TransportControl <> nil
+  then FTrans.TransportControl.ResetConsumedMessageSize;
+end;
+
 function TProtocolImpl.ReadAnsiString: AnsiString;
 var
   b : TBytes;
@@ -623,6 +640,29 @@
   WriteBinary( b );
 end;
 
+
+procedure TProtocolImpl.CheckReadBytesAvailable( const value : TThriftList);
+begin
+  FTrans.CheckReadBytesAvailable( value.Count * GetMinSerializedSize(value.ElementType));
+end;
+
+
+procedure TProtocolImpl.CheckReadBytesAvailable( const value : TThriftSet);
+begin
+  FTrans.CheckReadBytesAvailable( value.Count * GetMinSerializedSize(value.ElementType));
+end;
+
+
+procedure TProtocolImpl.CheckReadBytesAvailable( const value : TThriftMap);
+var nPairSize : Integer
+;
+begin
+  nPairSize := GetMinSerializedSize(value.KeyType) + GetMinSerializedSize(value.ValueType);
+  FTrans.CheckReadBytesAvailable( value.Count * nPairSize);
+end;
+
+
+
 { TProtocolUtil }
 
 class procedure TProtocolUtil.Skip( prot: IProtocol; type_: TType);
@@ -705,6 +745,7 @@
   buf : TBytes;
 begin
   size := ReadI32;
+  FTrans.CheckReadBytesAvailable( size);
   SetLength( buf, size);
   FTrans.ReadAll( buf, 0, size);
   Result := buf;
@@ -777,6 +818,7 @@
 begin
   result.ElementType := TType(ReadByte);
   result.Count       := ReadI32;
+  CheckReadBytesAvailable(result);
 end;
 
 procedure TBinaryProtocolImpl.ReadListEnd;
@@ -789,6 +831,7 @@
   result.KeyType   := TType(ReadByte);
   result.ValueType := TType(ReadByte);
   result.Count     := ReadI32;
+  CheckReadBytesAvailable(result);
 end;
 
 procedure TBinaryProtocolImpl.ReadMapEnd;
@@ -801,6 +844,7 @@
   size : Integer;
   version : Integer;
 begin
+  Reset;
   Init( result);
   size := ReadI32;
   if (size < 0) then begin
@@ -832,6 +876,7 @@
 begin
   result.ElementType := TType(ReadByte);
   result.Count       := ReadI32;
+  CheckReadBytesAvailable(result);
 end;
 
 procedure TBinaryProtocolImpl.ReadSetEnd;
@@ -842,6 +887,7 @@
 function TBinaryProtocolImpl.ReadStringBody( size: Integer): string;
 var buf : TBytes;
 begin
+  FTrans.CheckReadBytesAvailable( size);
   SetLength( buf, size);
   FTrans.ReadAll( buf, 0, size );
   Result := TEncoding.UTF8.GetString( buf);
@@ -959,6 +1005,7 @@
 procedure TBinaryProtocolImpl.WriteMessageBegin( const msg: TThriftMessage);
 var version : Cardinal;
 begin
+  Reset;
   if FStrictWrite then begin
     version := VERSION_1 or Cardinal( msg.Type_);
     WriteI32( Integer( version) );
@@ -997,6 +1044,29 @@
 
 end;
 
+function TBinaryProtocolImpl.GetMinSerializedSize( const aType : TType) : Integer;
+// Return the minimum number of bytes a type will consume on the wire
+begin
+  case aType of
+    TType.Stop:    result := 0;
+    TType.Void:    result := 0;
+    TType.Bool_:   result := SizeOf(Byte);
+    TType.Byte_:   result := SizeOf(Byte);
+    TType.Double_: result := SizeOf(Double);
+    TType.I16:     result := SizeOf(Int16);
+    TType.I32:     result := SizeOf(Int32);
+    TType.I64:     result := SizeOf(Int64);
+    TType.String_: result := SizeOf(Int32);  // string length
+    TType.Struct:  result := 0;  // empty struct
+    TType.Map:     result := SizeOf(Int32);  // element count
+    TType.Set_:    result := SizeOf(Int32);  // element count
+    TType.List:    result := SizeOf(Int32);  // element count
+  else
+    raise TTransportExceptionBadArgs.Create('Unhandled type code');
+  end;
+end;
+
+
 { TProtocolException }
 
 constructor TProtocolException.HiddenCreate(const Msg: string);
@@ -1363,6 +1433,12 @@
 end;
 
 
+function TProtocolDecorator.GetMinSerializedSize( const aType : TType) : Integer;
+begin
+  result := FWrappedProtocol.GetMinSerializedSize(aType);
+end;
+
+
 { Init helper functions }
 
 procedure Init( var rec : TThriftMessage; const AName: string; const AMessageType: TMessageType; const ASeqID: Integer);
diff --git a/lib/delphi/src/Thrift.Serializer.pas b/lib/delphi/src/Thrift.Serializer.pas
index b95cf61..1cbcbec 100644
--- a/lib/delphi/src/Thrift.Serializer.pas
+++ b/lib/delphi/src/Thrift.Serializer.pas
@@ -71,15 +71,18 @@
 
   public
     // Create a new TDeserializer that uses the TBinaryProtocol by default.
-    constructor Create;  overload;
+    constructor Create( const aMaxMessageSize : Integer = DEFAULT_MAX_MESSAGE_SIZE);  overload;
 
     // Create a new TDeserializer.
     // It will use the TProtocol specified by the factory that is passed in.
-    constructor Create( const factory : IProtocolFactory);  overload;
+    constructor Create( const factory : IProtocolFactory;
+                        const aMaxMessageSize : Integer = DEFAULT_MAX_MESSAGE_SIZE);  overload;
 
     // Create a new TDeserializer.
     // It will use the TProtocol and layered transports specified by the factories that are passed in.
-    constructor Create( const protfact : IProtocolFactory; const transfact : ITransportFactory);  overload;
+    constructor Create( const protfact : IProtocolFactory;
+                        const transfact : ITransportFactory;
+                        const aMaxMessageSize : Integer = DEFAULT_MAX_MESSAGE_SIZE);  overload;
 
     // DTOR
     destructor Destroy;  override;
@@ -122,7 +125,7 @@
   inherited Create;
   FStream    := TMemoryStream.Create;
   adapter    := TThriftStreamAdapterDelphi.Create( FStream, FALSE);
-  FTransport := TStreamTransportImpl.Create( nil, adapter);
+  FTransport := TStreamTransportImpl.Create( nil, adapter, TTransportControlImpl.Create(0));  // we don't read anything here
   if transfact <> nil then FTransport := transfact.GetTransport( FTransport);
   FProtocol  := protfact.GetProtocol( FTransport);
 
@@ -185,24 +188,26 @@
 { TDeserializer }
 
 
-constructor TDeserializer.Create;
+constructor TDeserializer.Create( const aMaxMessageSize : Integer);
 // Create a new TDeserializer that uses the TBinaryProtocol by default.
 begin
   //no inherited;
-  Create( TBinaryProtocolImpl.TFactory.Create, nil);
+  Create( TBinaryProtocolImpl.TFactory.Create, nil, aMaxMessageSize);
 end;
 
 
-constructor TDeserializer.Create( const factory : IProtocolFactory);
+constructor TDeserializer.Create( const factory : IProtocolFactory; const aMaxMessageSize : Integer);
 // Create a new TDeserializer.
 // It will use the TProtocol specified by the factory that is passed in.
 begin
   //no inherited;
-  Create( factory, nil);
+  Create( factory, nil, aMaxMessageSize);
 end;
 
 
-constructor TDeserializer.Create( const protfact : IProtocolFactory; const transfact : ITransportFactory);
+constructor TDeserializer.Create( const protfact : IProtocolFactory;
+                                  const transfact : ITransportFactory;
+                                  const aMaxMessageSize : Integer = DEFAULT_MAX_MESSAGE_SIZE);
 // Create a new TDeserializer.
 // It will use the TProtocol specified by the factory that is passed in.
 var adapter : IThriftStream;
@@ -210,7 +215,7 @@
   inherited Create;
   FStream    := TMemoryStream.Create;
   adapter    := TThriftStreamAdapterDelphi.Create( FStream, FALSE);
-  FTransport := TStreamTransportImpl.Create( adapter, nil);
+  FTransport := TStreamTransportImpl.Create( adapter, nil, TTransportControlImpl.Create(aMaxMessageSize));
   if transfact <> nil then FTransport := transfact.GetTransport( FTransport);
   FProtocol  := protfact.GetProtocol( FTransport);
 
diff --git a/lib/delphi/src/Thrift.Stream.pas b/lib/delphi/src/Thrift.Stream.pas
index 7cb9219..0f4e723 100644
--- a/lib/delphi/src/Thrift.Stream.pas
+++ b/lib/delphi/src/Thrift.Stream.pas
@@ -37,11 +37,12 @@
 
 type
   IThriftStream = interface
-    ['{2A77D916-7446-46C1-8545-0AEC0008DBCA}']
+    ['{DBE61E28-2A77-42DB-A5A3-3CCB8A2D09FA}']
     procedure Write( const buffer: TBytes; offset: Integer; count: Integer);  overload;
     procedure Write( const pBuf : Pointer; offset: Integer; count: Integer);  overload;
     function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer;  overload;
     function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;  overload;
+    procedure CheckReadBytesAvailable( const value : Integer);
     procedure Open;
     procedure Close;
     procedure Flush;
@@ -49,14 +50,24 @@
     function ToArray: TBytes;
   end;
 
-  TThriftStreamImpl = class( TInterfacedObject, IThriftStream)
+
+  IThriftStream2 = interface( IThriftStream)
+    ['{1F55D9FE-F617-4B80-B8CA-4A300D8E33F6}']
+    function Size : Int64;
+    function Position : Int64;
+  end;
+
+
+  TThriftStreamImpl = class abstract( TInterfacedObject, IThriftStream)
   strict private
     procedure CheckSizeAndOffset( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer);  overload;
   strict protected
+    // IThriftStream
     procedure Write( const buffer: TBytes; offset: Integer; count: Integer); overload; inline;
     procedure Write( const pBuf : Pointer; offset: Integer; count: Integer);  overload; virtual;
     function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer; overload; inline;
     function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; overload; virtual;
+    procedure CheckReadBytesAvailable( const value : Integer);  virtual; abstract;
     procedure Open; virtual; abstract;
     procedure Close; virtual; abstract;
     procedure Flush; virtual; abstract;
@@ -64,40 +75,54 @@
     function ToArray: TBytes; virtual; abstract;
   end;
 
-  TThriftStreamAdapterDelphi = class( TThriftStreamImpl )
+  TThriftStreamAdapterDelphi = class( TThriftStreamImpl, IThriftStream2)
   strict private
     FStream : TStream;
     FOwnsStream : Boolean;
   strict protected
+    // IThriftStream
     procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); override;
     function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
+    procedure CheckReadBytesAvailable( const value : Integer);  override;
     procedure Open; override;
     procedure Close; override;
     procedure Flush; override;
     function IsOpen: Boolean; override;
     function ToArray: TBytes; override;
+
+    // IThriftStream2
+    function Size : Int64;
+    function Position : Int64;
   public
-    constructor Create( const AStream: TStream; AOwnsStream : Boolean);
+    constructor Create( const aStream: TStream; aOwnsStream : Boolean);
     destructor Destroy; override;
   end;
 
-  TThriftStreamAdapterCOM = class( TThriftStreamImpl)
+  TThriftStreamAdapterCOM = class( TThriftStreamImpl, IThriftStream2)
   strict private
     FStream : IStream;
   strict protected
+    // IThriftStream
     procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); override;
     function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
+    procedure CheckReadBytesAvailable( const value : Integer);  override;
     procedure Open; override;
     procedure Close; override;
     procedure Flush; override;
     function IsOpen: Boolean; override;
     function ToArray: TBytes; override;
+
+    // IThriftStream2
+    function Size : Int64;
+    function Position : Int64;
   public
-    constructor Create( const AStream: IStream);
+    constructor Create( const aStream: IStream);
   end;
 
 implementation
 
+uses Thrift.Transport;
+
 { TThriftStreamAdapterCOM }
 
 procedure TThriftStreamAdapterCOM.Close;
@@ -105,10 +130,10 @@
   FStream := nil;
 end;
 
-constructor TThriftStreamAdapterCOM.Create( const AStream: IStream);
+constructor TThriftStreamAdapterCOM.Create( const aStream: IStream);
 begin
   inherited Create;
-  FStream := AStream;
+  FStream := aStream;
 end;
 
 procedure TThriftStreamAdapterCOM.Flush;
@@ -120,6 +145,24 @@
   end;
 end;
 
+function TThriftStreamAdapterCOM.Size : Int64;
+var statstg: TStatStg;
+begin
+  FillChar( statstg, SizeOf( statstg), 0);
+  if  IsOpen
+  and Succeeded( FStream.Stat( statstg, STATFLAG_NONAME ))
+  then result := statstg.cbSize
+  else result := 0;
+end;
+
+function TThriftStreamAdapterCOM.Position : Int64;
+var newpos : {$IF CompilerVersion >= 29.0} UInt64 {$ELSE} Int64  {$IFEND};
+begin
+  if SUCCEEDED( FStream.Seek( 0, STREAM_SEEK_CUR, newpos))
+  then result := Int64(newpos)  
+  else raise TTransportExceptionEndOfFile.Create('Seek() error');
+end;
+
 function TThriftStreamAdapterCOM.IsOpen: Boolean;
 begin
   Result := FStream <> nil;
@@ -148,21 +191,21 @@
   end;
 end;
 
+procedure TThriftStreamAdapterCOM.CheckReadBytesAvailable( const value : Integer);
+var nRemaining : Int64;
+begin
+  nRemaining := Self.Size - Self.Position;
+  if nRemaining < value
+  then raise TTransportExceptionEndOfFile.Create('Not enough input data');
+end;
+
 function TThriftStreamAdapterCOM.ToArray: TBytes;
 var
-  statstg: TStatStg;
-  len : Integer;
+  len : Int64;
   NewPos : {$IF CompilerVersion >= 29.0} UInt64 {$ELSE} Int64  {$IFEND};
   cbRead : Integer;
 begin
-  FillChar( statstg, SizeOf( statstg), 0);
-  len := 0;
-  if IsOpen then begin
-    if Succeeded( FStream.Stat( statstg, STATFLAG_NONAME )) then begin
-      len := statstg.cbSize;
-    end;
-  end;
-
+  len := Self.Size;
   SetLength( Result, len );
 
   if len > 0 then begin
@@ -226,6 +269,21 @@
 
 { TThriftStreamAdapterDelphi }
 
+constructor TThriftStreamAdapterDelphi.Create( const aStream: TStream; aOwnsStream: Boolean);
+begin
+  inherited Create;
+  FStream := aStream;
+  FOwnsStream := aOwnsStream;
+end;
+
+destructor TThriftStreamAdapterDelphi.Destroy;
+begin
+  if FOwnsStream
+  then Close;
+
+  inherited;
+end;
+
 procedure TThriftStreamAdapterDelphi.Close;
 begin
   FStream.Free;
@@ -233,26 +291,21 @@
   FOwnsStream := False;
 end;
 
-constructor TThriftStreamAdapterDelphi.Create( const AStream: TStream; AOwnsStream: Boolean);
-begin
-  inherited Create;
-  FStream := AStream;
-  FOwnsStream := AOwnsStream;
-end;
-
-destructor TThriftStreamAdapterDelphi.Destroy;
-begin
-  if FOwnsStream 
-  then Close;
-  
-  inherited;
-end;
-
 procedure TThriftStreamAdapterDelphi.Flush;
 begin
   // nothing to do
 end;
 
+function TThriftStreamAdapterDelphi.Size : Int64;
+begin
+  result := FStream.Size;
+end;
+
+function TThriftStreamAdapterDelphi.Position : Int64;
+begin
+  result := FStream.Position;
+end;
+
 function TThriftStreamAdapterDelphi.IsOpen: Boolean;
 begin
   Result := FStream <> nil;
@@ -279,16 +332,21 @@
   else Result := 0;
 end;
 
+procedure TThriftStreamAdapterDelphi.CheckReadBytesAvailable( const value : Integer);
+var nRemaining : Int64;
+begin
+  nRemaining := FStream.Size - FStream.Position;
+  if nRemaining < value then raise TTransportExceptionEndOfFile.Create('Not enough input data');
+end;
+
 function TThriftStreamAdapterDelphi.ToArray: TBytes;
 var
   OrgPos : Integer;
   len : Integer;
 begin
-  len := 0;
-  if FStream <> nil then
-  begin
-    len := FStream.Size;
-  end;
+  if FStream <> nil
+  then len := FStream.Size
+  else len := 0;
 
   SetLength( Result, len );
 
diff --git a/lib/delphi/src/Thrift.Transport.MsxmlHTTP.pas b/lib/delphi/src/Thrift.Transport.MsxmlHTTP.pas
index 87bf23b..b92cce1 100644
--- a/lib/delphi/src/Thrift.Transport.MsxmlHTTP.pas
+++ b/lib/delphi/src/Thrift.Transport.MsxmlHTTP.pas
@@ -59,6 +59,7 @@
     function  Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
     procedure Write( const pBuf : Pointer; off, len : Integer); override;
     procedure Flush; override;
+    procedure CheckReadBytesAvailable( const value : Integer); override;
 
     procedure SetDnsResolveTimeout(const Value: Integer);
     function GetDnsResolveTimeout: Integer;
@@ -80,7 +81,7 @@
     property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
     property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
   public
-    constructor Create( const AUri: string);
+    constructor Create( const AUri: string; const aTransportCtl : ITransportControl = nil);
     destructor Destroy; override;
   end;
 
@@ -90,9 +91,9 @@
 
 { TMsxmlHTTPClientImpl }
 
-constructor TMsxmlHTTPClientImpl.Create(const AUri: string);
+constructor TMsxmlHTTPClientImpl.Create(const AUri: string; const aTransportCtl : ITransportControl);
 begin
-  inherited Create;
+  inherited Create( aTransportCtl);
   FUri := AUri;
 
   // defaults according to MSDN
@@ -218,6 +219,13 @@
   end;
 end;
 
+procedure TMsxmlHTTPClientImpl.CheckReadBytesAvailable( const value : Integer);
+begin
+  if FInputStream <> nil
+  then FInputStream.CheckReadBytesAvailable( value)
+  else raise TTransportExceptionNotOpen.Create('No request has been sent');
+end;
+
 function TMsxmlHTTPClientImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
 begin
   if FInputStream = nil then begin
@@ -225,7 +233,8 @@
   end;
 
   try
-    Result := FInputStream.Read( pBuf, buflen, off, len)
+    Result := FInputStream.Read( pBuf, buflen, off, len);
+    ConsumeReadBytes( result);
   except
     on E: Exception
     do raise TTransportExceptionUnknown.Create(E.Message);
diff --git a/lib/delphi/src/Thrift.Transport.Pipes.pas b/lib/delphi/src/Thrift.Transport.Pipes.pas
index 9368f2f..b602b64 100644
--- a/lib/delphi/src/Thrift.Transport.Pipes.pas
+++ b/lib/delphi/src/Thrift.Transport.Pipes.pas
@@ -53,6 +53,7 @@
     //procedure Open; override; - see derived classes
     procedure Close; override;
     procedure Flush; override;
+    procedure CheckReadBytesAvailable( const value : Integer);  override;
 
     function  ReadDirect(     const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;  overload;
     function  ReadOverlapped( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;  overload;
@@ -123,13 +124,17 @@
   TNamedPipeTransportClientEndImpl = class( TPipeTransportBase)
   public
     // Named pipe constructors
-    constructor Create( aPipe : THandle; aOwnsHandle : Boolean;
-                        const aTimeOut : DWORD); overload;
+    constructor Create( const aPipe : THandle;
+                        const aOwnsHandle : Boolean;
+                        const aTimeOut : DWORD;
+                        const aTransportCtl : ITransportControl); overload;
+
     constructor Create( const aPipeName : string;
                         const aShareMode: DWORD = 0;
                         const aSecurityAttributes: PSecurityAttributes = nil;
                         const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT;
-                        const aOpenTimeOut : DWORD = DEFAULT_THRIFT_PIPE_OPEN_TIMEOUT);  overload;
+                        const aOpenTimeOut : DWORD = DEFAULT_THRIFT_PIPE_OPEN_TIMEOUT;
+                        const aTransportCtl : ITransportControl = nil);  overload;
   end;
 
 
@@ -139,17 +144,20 @@
   public
     // ITransport
     procedure Close; override;
-    constructor Create( aPipe : THandle; aOwnsHandle : Boolean;
-                        const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT); reintroduce;
+    constructor Create( const aPipe : THandle;
+                        const aOwnsHandle : Boolean;
+                        const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT;
+                        const aTransportCtl : ITransportControl = nil); reintroduce;
   end;
 
 
   TAnonymousPipeTransportImpl = class( TPipeTransportBase)
   public
     // Anonymous pipe constructor
-    constructor Create(const aPipeRead, aPipeWrite : THandle;
-                       aOwnsHandles : Boolean;
-                       const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT); overload;
+    constructor Create( const aPipeRead, aPipeWrite : THandle;
+                        const aOwnsHandles : Boolean;
+                        const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT;
+                        const aTransportCtl : ITransportControl = nil); overload;
   end;
 
 
@@ -310,6 +318,12 @@
 end;
 
 
+procedure TPipeStreamBase.CheckReadBytesAvailable( const value : Integer);
+begin
+  // can't tell how much we can suck out of the pipe
+end;
+
+
 procedure TPipeStreamBase.Write( const pBuf : Pointer; offset, count : Integer);
 begin
   if FOverlapped
@@ -642,21 +656,24 @@
 
 
 constructor TNamedPipeTransportClientEndImpl.Create( const aPipeName : string; const aShareMode: DWORD;
-                                   const aSecurityAttributes: PSecurityAttributes;
-                                   const aTimeOut, aOpenTimeOut : DWORD);
+                                                     const aSecurityAttributes: PSecurityAttributes;
+                                                     const aTimeOut, aOpenTimeOut : DWORD;
+                                                     const aTransportCtl : ITransportControl);
 // Named pipe constructor
 begin
-  inherited Create( nil, nil);
+  inherited Create( nil, nil, aTransportCtl);
   FInputStream  := TNamedPipeStreamImpl.Create( aPipeName, TRUE, aShareMode, aSecurityAttributes, aTimeOut, aOpenTimeOut);
   FOutputStream := FInputStream;  // true for named pipes
 end;
 
 
-constructor TNamedPipeTransportClientEndImpl.Create( aPipe : THandle; aOwnsHandle : Boolean;
-                                                     const aTimeOut : DWORD);
+constructor TNamedPipeTransportClientEndImpl.Create( const aPipe : THandle;
+                                                     const aOwnsHandle : Boolean;
+                                                     const aTimeOut : DWORD;
+                                                     const aTransportCtl : ITransportControl);
 // Named pipe constructor
 begin
-  inherited Create( nil, nil);
+  inherited Create( nil, nil, aTransportCtl);
   FInputStream  := THandlePipeStreamImpl.Create( aPipe, TRUE, aOwnsHandle, aTimeOut);
   FOutputStream := FInputStream;  // true for named pipes
 end;
@@ -665,12 +682,14 @@
 { TNamedPipeTransportServerEndImpl }
 
 
-constructor TNamedPipeTransportServerEndImpl.Create( aPipe : THandle; aOwnsHandle : Boolean;
-                                                     const aTimeOut : DWORD);
+constructor TNamedPipeTransportServerEndImpl.Create( const aPipe : THandle;
+                                                     const aOwnsHandle : Boolean;
+                                                     const aTimeOut : DWORD;
+                                                     const aTransportCtl : ITransportControl);
 // Named pipe constructor
 begin
   FHandle := DuplicatePipeHandle( aPipe);
-  inherited Create( aPipe, aOwnsHandle, aTimeOut);
+  inherited Create( aPipe, aOwnsHandle, aTimeOut, aTransportCtl);
 end;
 
 
@@ -688,11 +707,12 @@
 
 
 constructor TAnonymousPipeTransportImpl.Create( const aPipeRead, aPipeWrite : THandle;
-                                                aOwnsHandles : Boolean;
-                                                const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT);
+                                                const aOwnsHandles : Boolean;
+                                                const aTimeOut : DWORD;
+                                                const aTransportCtl : ITransportControl);
 // Anonymous pipe constructor
 begin
-  inherited Create( nil, nil);
+  inherited Create( nil, nil, aTransportCtl);
   // overlapped is not supported with AnonPipes, see MSDN
   FInputStream  := THandlePipeStreamImpl.Create( aPipeRead, aOwnsHandles, FALSE, aTimeOut);
   FOutputStream := THandlePipeStreamImpl.Create( aPipeWrite, aOwnsHandles, FALSE, aTimeOut);
diff --git a/lib/delphi/src/Thrift.Transport.WinHTTP.pas b/lib/delphi/src/Thrift.Transport.WinHTTP.pas
index 0bf7e45..ec8c87f 100644
--- a/lib/delphi/src/Thrift.Transport.WinHTTP.pas
+++ b/lib/delphi/src/Thrift.Transport.WinHTTP.pas
@@ -58,16 +58,19 @@
       THTTPResponseStream = class( TThriftStreamImpl)
       strict private
         FRequest : IWinHTTPRequest;
+        FTransportControl : ITransportControl;
       strict protected
         procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); override;
         function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
+        procedure CheckReadBytesAvailable( const value : Integer);  override;
+        procedure ConsumeReadBytes( const count : Integer);
         procedure Open; override;
         procedure Close; override;
         procedure Flush; override;
         function IsOpen: Boolean; override;
         function ToArray: TBytes; override;
       public
-        constructor Create( const aRequest : IWinHTTPRequest);
+        constructor Create( const aRequest : IWinHTTPRequest; const aTransportCtl : ITransportControl);
         destructor Destroy; override;
       end;
 
@@ -78,6 +81,7 @@
     function  Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
     procedure Write( const pBuf : Pointer; off, len : Integer); override;
     procedure Flush; override;
+    procedure CheckReadBytesAvailable( const value : Integer); override;
 
     procedure SetDnsResolveTimeout(const Value: Integer);
     function GetDnsResolveTimeout: Integer;
@@ -99,7 +103,7 @@
     property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
     property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
   public
-    constructor Create( const AUri: string);
+    constructor Create( const AUri: string; const aTransportCtl : ITransportControl = nil);
     destructor Destroy; override;
   end;
 
@@ -108,9 +112,9 @@
 
 { TWinHTTPClientImpl }
 
-constructor TWinHTTPClientImpl.Create(const AUri: string);
+constructor TWinHTTPClientImpl.Create(const AUri: string; const aTransportCtl : ITransportControl);
 begin
-  inherited Create;
+  inherited Create( aTransportCtl);
   FUri := AUri;
 
   // defaults according to MSDN
@@ -284,6 +288,13 @@
   end;
 end;
 
+procedure TWinHTTPClientImpl.CheckReadBytesAvailable( const value : Integer);
+begin
+  if FInputStream <> nil
+  then FInputStream.CheckReadBytesAvailable( value)
+  else raise TTransportExceptionNotOpen.Create('No request has been sent');
+end;
+
 function TWinHTTPClientImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
 begin
   if FInputStream = nil then begin
@@ -291,7 +302,8 @@
   end;
 
   try
-    Result := FInputStream.Read( pBuf, buflen, off, len)
+    Result := FInputStream.Read( pBuf, buflen, off, len);
+    ConsumeReadBytes( result);
   except
     on E: Exception
     do raise TTransportExceptionUnknown.Create(E.Message);
@@ -301,6 +313,7 @@
 procedure TWinHTTPClientImpl.SendRequest;
 var
   http  : IWinHTTPRequest;
+  ctrl  : ITransportControl;
   pData : PByte;
   len   : Integer;
   error : Cardinal;
@@ -327,7 +340,8 @@
     else raise TTransportExceptionInterrupted.Create( sMsg);
   end;
 
-  FInputStream := THTTPResponseStream.Create(http);
+  ctrl := TTransportControlImpl.Create( TransportControl.MaxAllowedMessageSize);
+  FInputStream := THTTPResponseStream.Create( http, ctrl);
 end;
 
 procedure TWinHTTPClientImpl.Write( const pBuf : Pointer; off, len : Integer);
@@ -341,10 +355,12 @@
 
 { TWinHTTPClientImpl.THTTPResponseStream }
 
-constructor TWinHTTPClientImpl.THTTPResponseStream.Create( const aRequest : IWinHTTPRequest);
+constructor TWinHTTPClientImpl.THTTPResponseStream.Create( const aRequest : IWinHTTPRequest; const aTransportCtl : ITransportControl);
 begin
   inherited Create;
   FRequest := aRequest;
+  FTransportControl := aTransportCtl;
+  ASSERT( FTransportControl <> nil);
 end;
 
 destructor TWinHTTPClientImpl.THTTPResponseStream.Destroy;
@@ -390,6 +406,8 @@
   if count >= buflen-offset
   then count := buflen-offset;
 
+  CheckReadBytesAvailable(count);
+
   if count > 0 then begin
     pTmp   := pBuf;
     Inc( pTmp, offset);
@@ -397,6 +415,20 @@
     ASSERT( Result >= 0);
   end
   else Result := 0;
+
+  ConsumeReadBytes( result);
+end;
+
+procedure TWinHTTPClientImpl.THTTPResponseStream.ConsumeReadBytes( const count : Integer);
+begin
+  if FTransportControl <> nil
+  then FTransportControl.ConsumeReadBytes( count);
+end;
+
+procedure TWinHTTPClientImpl.THTTPResponseStream.CheckReadBytesAvailable( const value : Integer);
+begin
+  if Int64(value) > Int64(FRequest.QueryDataAvailable)
+  then raise TTransportExceptionEndOfFile.Create('Not enough input data');
 end;
 
 function TWinHTTPClientImpl.THTTPResponseStream.ToArray: TBytes;
diff --git a/lib/delphi/src/Thrift.Transport.pas b/lib/delphi/src/Thrift.Transport.pas
index bede57c..a3476bf 100644
--- a/lib/delphi/src/Thrift.Transport.pas
+++ b/lib/delphi/src/Thrift.Transport.pas
@@ -44,14 +44,39 @@
   Thrift.WinHTTP,
   Thrift.Stream;
 
+const
+  DEFAULT_MAX_MESSAGE_SIZE = 100 * 1024 * 1024; // 100 MB
+  DEFAULT_THRIFT_TIMEOUT = 5 * 1000; // ms
+
 type
+  ITransportControl = interface
+    ['{CDA35E2C-F1D2-4BE3-9927-7F1540923265}']
+    function  MaxAllowedMessageSize : Integer;
+    procedure ConsumeReadBytes( const count : Integer);
+    procedure ResetConsumedMessageSize;
+  end;
+
+  TTransportControlImpl = class( TInterfacedObject, ITransportControl)
+  strict private
+    FMaxAllowedMsgSize : Integer;
+    FRemainingMsgSize : Integer;
+  strict protected
+    // ITransportControl
+    function  MaxAllowedMessageSize : Integer;
+    procedure ConsumeReadBytes( const count : Integer);
+    procedure ResetConsumedMessageSize;
+  public
+    constructor Create( const aMaxMessageSize : Integer = DEFAULT_MAX_MESSAGE_SIZE);  reintroduce;
+  end;
+
   ITransport = interface
-    ['{DB84961E-8BB3-4532-99E1-A8C7AC2300F7}']
+    ['{938F6EB5-1848-43D5-8AC4-07633C55B229}']
     function GetIsOpen: Boolean;
     property IsOpen: Boolean read GetIsOpen;
     function Peek: Boolean;
     procedure Open;
     procedure Close;
+
     function Read(var buf: TBytes; off: Integer; len: Integer): Integer; overload;
     function Read(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload;
     function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; overload;
@@ -61,15 +86,22 @@
     procedure Write( const pBuf : Pointer; off, len : Integer); overload;
     procedure Write( const pBuf : Pointer; len : Integer); overload;
     procedure Flush;
+
+    function  TransportControl : ITransportControl;
+    procedure CheckReadBytesAvailable( const value : Integer);
   end;
 
   TTransportImpl = class( TInterfacedObject, ITransport)
+  strict private
+    FTransportControl : ITransportControl;
+
   strict protected
     function GetIsOpen: Boolean; virtual; abstract;
     property IsOpen: Boolean read GetIsOpen;
     function Peek: Boolean; virtual;
     procedure Open(); virtual; abstract;
     procedure Close(); virtual; abstract;
+
     function Read(var buf: TBytes; off: Integer; len: Integer): Integer; overload; inline;
     function Read(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload; virtual; abstract;
     function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer;  overload; inline;
@@ -79,6 +111,13 @@
     procedure Write( const pBuf : Pointer; len : Integer); overload; inline;
     procedure Write( const pBuf : Pointer; off, len : Integer); overload; virtual; abstract;
     procedure Flush; virtual;
+
+    function  TransportControl : ITransportControl;  inline;
+    procedure ConsumeReadBytes( const count : Integer);  inline;
+    procedure CheckReadBytesAvailable( const value : Integer); virtual; abstract;
+
+  public
+    constructor Create( const aTransportCtl : ITransportControl);  reintroduce;
   end;
 
   TTransportException = class abstract( TException)
@@ -98,9 +137,9 @@
     constructor HiddenCreate(const Msg: string);
     class function GetType: TExceptionType;  virtual; abstract;
   public
-    class function Create( AType: TExceptionType): TTransportException; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
+    class function Create( aType: TExceptionType): TTransportException; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
     class function Create( const msg: string): TTransportException; reintroduce; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
-    class function Create( AType: TExceptionType; const msg: string): TTransportException; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
+    class function Create( aType: TExceptionType; const msg: string): TTransportException; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
     property Type_: TExceptionType read GetType;
   end;
 
@@ -196,7 +235,7 @@
 
   ITransportFactory = interface
     ['{DD809446-000F-49E1-9BFF-E0D0DC76A9D7}']
-    function GetTransport( const ATrans: ITransport): ITransport;
+    function GetTransport( const aTransport: ITransport): ITransport;
   end;
 
   TTransportFactoryImpl = class( TInterfacedObject, ITransportFactory)
@@ -222,6 +261,7 @@
   strict protected
     procedure Write( const pBuf : Pointer; offset, count: Integer); override;
     function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
+    procedure CheckReadBytesAvailable( const value : Integer);  override;
     procedure Open; override;
     procedure Close; override;
     procedure Flush; override;
@@ -230,9 +270,9 @@
     function ToArray: TBytes; override;
   public
 {$IFDEF OLD_SOCKETS}
-    constructor Create( const ATcpClient: TCustomIpClient; const aTimeout : Integer = 0);
+    constructor Create( const aTcpClient: TCustomIpClient; const aTimeout : Integer = 0);
 {$ELSE}
-    constructor Create( const ATcpClient: TSocket; const aTimeout : Longword = 0);
+    constructor Create( const aTcpClient: TSocket; const aTimeout : Longword = 0);
 {$ENDIF}
   end;
 
@@ -254,14 +294,15 @@
     function GetInputStream: IThriftStream;
     function GetOutputStream: IThriftStream;
 
-  protected
+    procedure CheckReadBytesAvailable( const value : Integer); override;
+  strict protected
     procedure Open; override;
     procedure Close; override;
     procedure Flush; override;
     function  Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
     procedure Write( const pBuf : Pointer; off, len : Integer); override;
   public
-    constructor Create( const aInputStream, aOutputStream : IThriftStream);
+    constructor Create( const aInputStream, aOutputStream : IThriftStream; const aTransportCtl : ITransportControl = nil);
     destructor Destroy; override;
 
     property InputStream : IThriftStream read GetInputStream;
@@ -277,6 +318,7 @@
   strict protected
     procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); override;
     function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
+    procedure CheckReadBytesAvailable( const value : Integer); override;
     procedure Open;  override;
     procedure Close; override;
     procedure Flush; override;
@@ -298,15 +340,19 @@
 {$ENDIF}
     FUseBufferedSocket : Boolean;
     FOwnsServer : Boolean;
+    FTransportControl : ITransportControl;
+
   strict protected
     function Accept( const fnAccepting: TProc) : ITransport; override;
+    property TransportControl : ITransportControl read FTransportControl;
+
   public
 {$IFDEF OLD_SOCKETS}
-    constructor Create( const aServer: TTcpServer; const aClientTimeout: Integer = 0); overload;
-    constructor Create( const aPort: Integer; const aClientTimeout: Integer = 0; const aUseBufferedSockets: Boolean = FALSE); overload;
+    constructor Create( const aServer: TTcpServer; const aClientTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; const aTransportCtl : ITransportControl = nil); overload;
+    constructor Create( const aPort: Integer; const aClientTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; aUseBufferedSockets: Boolean = FALSE; const aTransportCtl : ITransportControl = nil); overload;
 {$ELSE}
-    constructor Create( const aServer: TServerSocket; const aClientTimeout: Longword = 0); overload;
-    constructor Create( const aPort: Integer; const aClientTimeout: Longword = 0; const aUseBufferedSockets: Boolean = FALSE); overload;
+    constructor Create( const aServer: TServerSocket; const aClientTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; const aTransportCtl : ITransportControl = nil); overload;
+    constructor Create( const aPort: Integer; const aClientTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; aUseBufferedSockets: Boolean = FALSE; const aTransportCtl : ITransportControl = nil); overload;
 {$ENDIF}
     destructor Destroy; override;
     procedure Listen; override;
@@ -325,6 +371,7 @@
   strict protected
     function GetIsOpen: Boolean; override;
     procedure Flush; override;
+    procedure CheckReadBytesAvailable( const value : Integer);  override;
   public
     type
       TFactory = class( TTransportFactoryImpl )
@@ -363,11 +410,11 @@
   public
     procedure Open; override;
 {$IFDEF OLD_SOCKETS}
-    constructor Create( const aClient : TCustomIpClient; const aOwnsClient : Boolean; const aTimeout: Integer = 0); overload;
-    constructor Create( const aHost: string; const aPort: Integer; const aTimeout: Integer = 0); overload;
+    constructor Create( const aClient : TCustomIpClient; const aOwnsClient : Boolean; const aTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; const aTransportCtl : ITransportControl = nil); overload;
+    constructor Create( const aHost: string; const aPort: Integer; const aTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; const aTransportCtl : ITransportControl = nil); overload;
 {$ELSE}
-    constructor Create( const aClient: TSocket; const aOwnsClient: Boolean); overload;
-    constructor Create( const aHost: string; const aPort: Integer; const aTimeout: Longword = 0); overload;
+    constructor Create(const aClient: TSocket; const aOwnsClient: Boolean; const aTransportCtl : ITransportControl = nil); overload;
+    constructor Create( const aHost: string; const aPort: Integer; const aTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; const aTransportCtl : ITransportControl = nil); overload;
 {$ENDIF}
     destructor Destroy; override;
     procedure Close; override;
@@ -402,6 +449,7 @@
     function  Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
     procedure Write( const pBuf : Pointer; off, len : Integer); override;
     procedure Flush; override;
+    procedure CheckReadBytesAvailable( const value : Integer); override;
   public
     type
       TFactory = class( TTransportFactoryImpl )
@@ -409,20 +457,67 @@
         function GetTransport( const aTransport: ITransport): ITransport; override;
       end;
 
+    constructor Create( const aTransportCtl : ITransportControl); overload;
     constructor Create( const aTransport: ITransport); overload;
     destructor Destroy; override;
   end;
 
 
 const
-  DEFAULT_THRIFT_TIMEOUT = 5 * 1000; // ms
   DEFAULT_THRIFT_SECUREPROTOCOLS = [ TSecureProtocol.TLS_1_1, TSecureProtocol.TLS_1_2];
 
 implementation
 
 
+{ TTransportControlImpl }
+
+constructor TTransportControlImpl.Create( const aMaxMessageSize : Integer);
+begin
+  inherited Create;
+
+  if aMaxMessageSize > 0
+  then FMaxAllowedMsgSize := aMaxMessageSize
+  else FMaxAllowedMsgSize := DEFAULT_MAX_MESSAGE_SIZE;
+
+  ResetConsumedMessageSize;
+end;
+
+function TTransportControlImpl.MaxAllowedMessageSize : Integer;
+begin
+  result := FMaxAllowedMsgSize;
+end;
+
+procedure TTransportControlImpl.ResetConsumedMessageSize;
+begin
+  FRemainingMsgSize := MaxAllowedMessageSize;
+end;
+
+
+procedure TTransportControlImpl.ConsumeReadBytes( const count : Integer);
+begin
+  if FRemainingMsgSize >= count
+  then Dec( FRemainingMsgSize, count)
+  else begin
+    FRemainingMsgSize := 0;
+    if FRemainingMsgSize < count
+    then raise TTransportExceptionEndOfFile.Create('Maximum message size reached');
+  end;
+end;
+
+
 { TTransportImpl }
 
+constructor TTransportImpl.Create( const aTransportCtl : ITransportControl);
+begin
+  inherited Create;
+
+  if aTransportCtl <> nil
+  then FTransportControl := aTransportCtl
+  else FTransportControl := TTransportControlImpl.Create;
+  ASSERT( FTransportControl <> nil);
+end;
+
+
 procedure TTransportImpl.Flush;
 begin
   // nothing to do
@@ -477,6 +572,19 @@
 end;
 
 
+function TTransportImpl.TransportControl : ITransportControl;
+begin
+  result := FTransportControl;
+end;
+
+
+procedure TTransportImpl.ConsumeReadBytes( const count : Integer);
+begin
+  if FTransportControl <> nil
+  then FTransportControl.ConsumeReadBytes( count);
+end;
+
+
 { TTransportException }
 
 constructor TTransportException.HiddenCreate(const Msg: string);
@@ -571,13 +679,15 @@
 { TServerSocket }
 
 {$IFDEF OLD_SOCKETS}
-constructor TServerSocketImpl.Create( const aServer: TTcpServer; const aClientTimeout : Integer);
+constructor TServerSocketImpl.Create( const aServer: TTcpServer; const aClientTimeout : Integer; const aTransportCtl : ITransportControl);
 {$ELSE}
-constructor TServerSocketImpl.Create( const aServer: TServerSocket; const aClientTimeout: Longword);
+constructor TServerSocketImpl.Create( const aServer: TServerSocket; const aClientTimeout: Longword; const aTransportCtl : ITransportControl);
 {$ENDIF}
 begin
   inherited Create;
   FServer := aServer;
+  FTransportControl := aTransportCtl;
+  ASSERT( FTransportControl <> nil);
 
 {$IFDEF OLD_SOCKETS}
   FClientTimeout := aClientTimeout;
@@ -589,13 +699,18 @@
 
 
 {$IFDEF OLD_SOCKETS}
-constructor TServerSocketImpl.Create( const aPort: Integer; const aClientTimeout: Integer; const aUseBufferedSockets: Boolean);
+constructor TServerSocketImpl.Create( const aPort: Integer; const aClientTimeout: Integer; aUseBufferedSockets: Boolean; const aTransportCtl : ITransportControl);
 {$ELSE}
-constructor TServerSocketImpl.Create( const aPort: Integer; const aClientTimeout: Longword; const aUseBufferedSockets: Boolean);
+constructor TServerSocketImpl.Create( const aPort: Integer; const aClientTimeout: Longword; aUseBufferedSockets: Boolean; const aTransportCtl : ITransportControl);
 {$ENDIF}
 begin
   inherited Create;
 
+  if aTransportCtl <> nil
+  then FTransportControl := aTransportCtl
+  else FTransportControl := TTransportControlImpl.Create;
+  ASSERT( FTransportControl <> nil);
+
 {$IFDEF OLD_SOCKETS}
   FPort := aPort;
   FClientTimeout := aClientTimeout;
@@ -657,7 +772,7 @@
       Exit;
     end;
 
-    trans := TSocketImpl.Create( client, TRUE, FClientTimeout);
+    trans := TSocketImpl.Create( client, TRUE, FClientTimeout, TransportControl);
     client := nil;  // trans owns it now
 
     if FUseBufferedSocket
@@ -676,7 +791,7 @@
 
   client := FServer.Accept;
   try
-    trans := TSocketImpl.Create(client, MaxMessageSize, True);
+    trans := TSocketImpl.Create(client, True, TransportControl);
     client := nil;
 
     if FUseBufferedSocket then
@@ -725,9 +840,9 @@
 { TSocket }
 
 {$IFDEF OLD_SOCKETS}
-constructor TSocketImpl.Create( const aClient : TCustomIpClient; const aOwnsClient : Boolean; const aTimeout: Integer);
+constructor TSocketImpl.Create( const aClient : TCustomIpClient; const aOwnsClient : Boolean; const aTimeout: Integer; const aTransportCtl : ITransportControl);
 {$ELSE}
-constructor TSocketImpl.Create(const aClient: TSocket; const aOwnsClient: Boolean);
+constructor TSocketImpl.Create(const aClient: TSocket; const aOwnsClient: Boolean; const aTransportCtl : ITransportControl);
 {$ENDIF}
 var stream : IThriftStream;
 begin
@@ -741,16 +856,16 @@
 {$ENDIF}
 
   stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
-  inherited Create( stream, stream);
+  inherited Create( stream, stream, aTransportCtl);
 end;
 
 {$IFDEF OLD_SOCKETS}
-constructor TSocketImpl.Create(const aHost: string; const aPort, aTimeout: Integer);
+constructor TSocketImpl.Create(const aHost: string; const aPort, aTimeout: Integer; const aTransportCtl : ITransportControl);
 {$ELSE}
-constructor TSocketImpl.Create(const aHost: string; const aPort : Integer; const aTimeout: Longword);
+constructor TSocketImpl.Create(const aHost: string; const aPort : Integer; const aTimeout: Longword; const aTransportCtl : ITransportControl);
 {$ENDIF}
 begin
-  inherited Create(nil,nil);
+  inherited Create(nil,nil, aTransportCtl);
   FHost := aHost;
   FPort := aPort;
   FTimeout := aTimeout;
@@ -928,6 +1043,22 @@
 end;
 
 
+procedure TBufferedStreamImpl.CheckReadBytesAvailable( const value : Integer);
+var nRequired : Integer;
+begin
+  nRequired := value;
+
+  if FReadBuffer <> nil then begin
+    Dec( nRequired, (FReadBuffer.Position - FReadBuffer.Size));
+    if nRequired <= 0 then Exit;
+  end;
+
+  if FStream <> nil
+  then FStream.CheckReadBytesAvailable( nRequired)
+  else raise TTransportExceptionEndOfFile.Create('Not enough input data');
+end;
+
+
 function TBufferedStreamImpl.ToArray: TBytes;
 var len : Integer;
 begin
@@ -963,9 +1094,9 @@
 
 { TStreamTransportImpl }
 
-constructor TStreamTransportImpl.Create( const aInputStream, aOutputStream : IThriftStream);
+constructor TStreamTransportImpl.Create( const aInputStream, aOutputStream : IThriftStream; const aTransportCtl : ITransportControl);
 begin
-  inherited Create;
+  inherited Create( aTransportCtl);
   FInputStream := aInputStream;
   FOutputStream := aOutputStream;
 end;
@@ -1018,6 +1149,7 @@
   then raise TTransportExceptionNotOpen.Create('Cannot read from null inputstream' );
 
   Result := FInputStream.Read( pBuf,buflen, off, len );
+  ConsumeReadBytes( result);
 end;
 
 procedure TStreamTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
@@ -1028,13 +1160,20 @@
   FOutputStream.Write( pBuf, off, len );
 end;
 
+procedure TStreamTransportImpl.CheckReadBytesAvailable( const value : Integer);
+begin
+  if FInputStream <> nil
+  then FInputStream.CheckReadBytesAvailable( value)
+  else raise TTransportExceptionNotOpen.Create('Cannot read from null inputstream' );
+end;
+
 
 { TBufferedTransportImpl }
 
 constructor TBufferedTransportImpl.Create( const aTransport : IStreamTransport; const aBufSize: Integer);
 begin
   ASSERT( aTransport <> nil);
-  inherited Create;
+  inherited Create( aTransport.TransportControl);
   FTransport := aTransport;
   FBufSize := aBufSize;
   InitBuffers;
@@ -1094,6 +1233,23 @@
   end;
 end;
 
+procedure TBufferedTransportImpl.CheckReadBytesAvailable( const value : Integer);
+var stm2 : IThriftStream2;
+    need : Integer;
+begin
+  need := value;
+
+  // buffered bytes
+  if Supports( FInputBuffer, IThriftStream2, stm2) then begin
+    Dec( need, stm2.Size - stm2.Position);
+    if need <= 0 then Exit;
+  end;
+
+  if FInputBuffer <> nil
+  then FInputBuffer.CheckReadBytesAvailable( need)
+  else raise TTransportExceptionNotOpen.Create('Cannot read from null inputstream' );
+end;
+
 { TBufferedTransportImpl.TFactory }
 
 function TBufferedTransportImpl.TFactory.GetTransport( const aTransport: ITransport): ITransport;
@@ -1104,10 +1260,18 @@
 
 { TFramedTransportImpl }
 
+constructor TFramedTransportImpl.Create( const aTransportCtl : ITransportControl);
+begin
+  inherited Create( aTransportCtl);
+
+  InitMaxFrameSize;
+  InitWriteBuffer;
+end;
+
 constructor TFramedTransportImpl.Create( const aTransport: ITransport);
 begin
   ASSERT( aTransport <> nil);
-  inherited Create;
+  inherited Create( aTransport.TransportControl);
 
   InitMaxFrameSize;
   InitWriteBuffer;
@@ -1122,8 +1286,15 @@
 end;
 
 procedure TFramedTransportImpl.InitMaxFrameSize;
+var maxLen : Integer;
 begin
   FMaxFrameSize := DEFAULT_MAX_LENGTH;
+
+  // MaxAllowedMessageSize may be smaller, but not larger
+  if TransportControl <> nil then begin
+    maxLen := TransportControl.MaxAllowedMessageSize - SizeOf(TFramedHeader);
+    FMaxFrameSize := Min( FMaxFrameSize, maxLen);
+  end;
 end;
 
 procedure TFramedTransportImpl.Close;
@@ -1225,6 +1396,7 @@
     raise TTransportExceptionCorruptedData.Create('Frame size ('+IntToStr(size)+') larger than allowed maximum ('+IntToStr(FMaxFrameSize)+')');
   end;
 
+  FTransport.CheckReadBytesAvailable( size);
   SetLength( buff, size );
   FTransport.ReadAll( buff, 0, size );
 
@@ -1247,6 +1419,18 @@
 end;
 
 
+procedure TFramedTransportImpl.CheckReadBytesAvailable( const value : Integer);
+var nRemaining : Int64;
+begin
+  if FReadBuffer = nil
+  then raise TTransportExceptionEndOfFile.Create('Cannot read from null inputstream');
+
+  nRemaining := FReadBuffer.Size - FReadBuffer.Position;
+  if value > nRemaining
+  then raise TTransportExceptionEndOfFile.Create('Not enough input data');
+end;
+
+
 { TFramedTransport.TFactory }
 
 function TFramedTransportImpl.TFactory.GetTransport( const aTransport: ITransport): ITransport;
@@ -1577,6 +1761,10 @@
 
 {$ENDIF}
 
+procedure TTcpSocketStreamImpl.CheckReadBytesAvailable( const value : Integer);
+begin
+  // we can't really tell, no further checks possible
+end;
 
 
 end.
diff --git a/lib/delphi/src/Thrift.WinHTTP.pas b/lib/delphi/src/Thrift.WinHTTP.pas
index 854d7c0..dc29bec 100644
--- a/lib/delphi/src/Thrift.WinHTTP.pas
+++ b/lib/delphi/src/Thrift.WinHTTP.pas
@@ -486,7 +486,7 @@
   IWinHTTPConnection = interface;
 
   IWinHTTPRequest = interface
-    ['{F65952F2-2F3B-47DC-B524-F1694E6D2AD7}']
+    ['{7862DC7C-3128-4AA1-B9B0-0EB0FE8B15B9}']
     function  Handle : HINTERNET;
     function  Connection : IWinHTTPConnection;
     function  AddRequestHeader( const aHeader : string; const addflag : DWORD = WINHTTP_ADDREQ_FLAG_ADD) : Boolean;
@@ -498,6 +498,7 @@
     function  FlushAndReceiveResponse : Boolean;
     function  ReadData( const dwRead : DWORD) : TBytes;  overload;
     function  ReadData( const pBuf : Pointer; const dwRead : DWORD) : DWORD;  overload;
+    function  QueryDataAvailable : DWORD;
   end;
 
   IWinHTTPConnection = interface
@@ -616,6 +617,7 @@
     function  FlushAndReceiveResponse : Boolean;
     function  ReadData( const dwRead : DWORD) : TBytes;  overload;
     function  ReadData( const pBuf : Pointer; const dwRead : DWORD) : DWORD;  overload;
+    function  QueryDataAvailable : DWORD;
 
   public
     constructor Create( const aConnection : IWinHTTPConnection;
@@ -1111,6 +1113,14 @@
 end;
 
 
+function TWinHTTPRequestImpl.QueryDataAvailable : DWORD;
+begin
+  if not WinHttpQueryDataAvailable( FHandle, result)
+  then result := 0;
+end;
+
+
+
 { TWinHTTPUrlImpl }
 
 constructor TWinHTTPUrlImpl.Create(const aUri: UnicodeString);
diff --git a/lib/delphi/test/TestClient.pas b/lib/delphi/test/TestClient.pas
index a488cac..3562dab 100644
--- a/lib/delphi/test/TestClient.pas
+++ b/lib/delphi/test/TestClient.pas
@@ -1396,7 +1396,7 @@
   case FSetup.endpoint of
     trns_Sockets: begin
       Console.WriteLine('Using sockets ('+FSetup.host+' port '+IntToStr(FSetup.port)+')');
-      streamtrans := TSocketImpl.Create( FSetup.host, FSetup.port );
+      streamtrans := TSocketImpl.Create( FSetup.host, FSetup.port, DEFAULT_THRIFT_TIMEOUT);
       FTransport := streamtrans;
     end;
 
diff --git a/lib/delphi/test/TestServer.pas b/lib/delphi/test/TestServer.pas
index 2a80d52..a9c71fb 100644
--- a/lib/delphi/test/TestServer.pas
+++ b/lib/delphi/test/TestServer.pas
@@ -585,7 +585,7 @@
       trns_Sockets : begin
         Console.WriteLine('- sockets (port '+IntToStr(port)+')');
         if (trns_Buffered in layered) then Console.WriteLine('- buffered');
-        servertrans := TServerSocketImpl.Create( Port, 0, (trns_Buffered in layered));
+        servertrans := TServerSocketImpl.Create( Port, DEFAULT_THRIFT_TIMEOUT, (trns_Buffered in layered));
       end;
 
       trns_MsxmlHttp,
diff --git a/lib/delphi/test/serializer/TestSerializer.Tests.pas b/lib/delphi/test/serializer/TestSerializer.Tests.pas
index ec8d86d..fe69f4e 100644
--- a/lib/delphi/test/serializer/TestSerializer.Tests.pas
+++ b/lib/delphi/test/serializer/TestSerializer.Tests.pas
@@ -69,6 +69,9 @@
     class procedure Deserialize( const input : TBytes; const target : IBase; const factory : TFactoryPair);  overload;
     class procedure Deserialize( const input : TStream; const target : IBase; const factory : TFactoryPair);  overload;
 
+    class procedure ValidateReadToEnd( const input : TBytes; const serial : TDeserializer);  overload;
+    class procedure ValidateReadToEnd( const input : TStream; const serial : TDeserializer);  overload;
+
     procedure Test_Serializer_Deserializer;
     procedure Test_OneOfEach(     const method : TMethod; const factory : TFactoryPair; const stream : TFileStream);
     procedure Test_CompactStruct( const method : TMethod; const factory : TFactoryPair; const stream : TFileStream);
@@ -305,9 +308,10 @@
 class procedure TTestSerializer.Deserialize( const input : TBytes; const target : IBase; const factory : TFactoryPair);
 var serial : TDeserializer;
 begin
-  serial := TDeserializer.Create( factory.prot, factory.trans);
+  serial := TDeserializer.Create( factory.prot, factory.trans, Length(input));
   try
     serial.Deserialize( input, target);
+    ValidateReadToEnd( input, serial);
   finally
     serial.Free;
   end;
@@ -317,13 +321,44 @@
 class procedure TTestSerializer.Deserialize( const input : TStream; const target : IBase; const factory : TFactoryPair);
 var serial : TDeserializer;
 begin
-  serial := TDeserializer.Create( factory.prot, factory.trans);
+  serial := TDeserializer.Create( factory.prot, factory.trans, input.Size);
   try
     serial.Deserialize( input, target);
+    ValidateReadToEnd( input, serial);
   finally
     serial.Free;
   end;
 end;
 
 
+class procedure TTestSerializer.ValidateReadToEnd( const input : TBytes; const serial : TDeserializer);
+// we should not have any more byte to read
+var dummy : IBase;
+begin
+  try
+    dummy := TOneOfEachImpl.Create;
+    serial.Deserialize( input, dummy);
+    raise EInOutError.Create('Expected exception not thrown?');
+  except
+    on e:TTransportExceptionEndOfFile do {expected};
+    on e:Exception do raise; // unexpected
+  end;
+end;
+
+
+class procedure TTestSerializer.ValidateReadToEnd( const input : TStream; const serial : TDeserializer);
+// we should not have any more byte to read
+var dummy : IBase;
+begin
+  try
+    input.Position := 0;
+    dummy := TOneOfEachImpl.Create;
+    serial.Deserialize( input, dummy);
+    raise EInOutError.Create('Expected exception not thrown?');
+  except
+    on e:TTransportExceptionEndOfFile do {expected};
+    on e:Exception do raise; // unexpected
+  end;
+end;
+
 end.