THRIFT-5007 Implement MAX_MESSAGE_SIZE and remaining read bytes control
Client: Delphi
Patch: Jens Geyer
This closes #1932
diff --git a/lib/delphi/src/Thrift.Processor.Multiplex.pas b/lib/delphi/src/Thrift.Processor.Multiplex.pas
index 622f730..ba77d94 100644
--- a/lib/delphi/src/Thrift.Processor.Multiplex.pas
+++ b/lib/delphi/src/Thrift.Processor.Multiplex.pas
@@ -132,6 +132,7 @@
function TMultiplexedProcessorImpl.TStoredMessageProtocol.ReadMessageBegin: TThriftMessage;
begin
+ Reset;
result := FMessageBegin;
end;
diff --git a/lib/delphi/src/Thrift.Protocol.Compact.pas b/lib/delphi/src/Thrift.Protocol.Compact.pas
index 866bd26..109e660 100644
--- a/lib/delphi/src/Thrift.Protocol.Compact.pas
+++ b/lib/delphi/src/Thrift.Protocol.Compact.pas
@@ -133,8 +133,6 @@
constructor Create(const trans : ITransport);
destructor Destroy; override;
- procedure Reset;
-
strict private
procedure WriteByteDirect( const b : Byte); overload;
@@ -172,7 +170,7 @@
procedure WriteDouble( const dub: Double); override;
procedure WriteBinary( const b: TBytes); overload; override;
- private
+ private // unit visible stuff
class function DoubleToInt64Bits( const db : Double) : Int64;
class function Int64BitsToDouble( const i64 : Int64) : Double;
@@ -193,6 +191,10 @@
//Convert a Int64 into little-endian bytes in buf starting at off and going until off+7.
class procedure fixedLongToBytes( const n : Int64; var buf : TBytes);
+ strict protected
+ function GetMinSerializedSize( const aType : TType) : Integer; override;
+ procedure Reset; override;
+
public
function ReadMessageBegin: TThriftMessage; override;
procedure ReadMessageEnd(); override;
@@ -291,6 +293,7 @@
procedure TCompactProtocolImpl.Reset;
begin
+ inherited Reset;
lastField_.Clear();
lastFieldId_ := 0;
Init( booleanField_, '', TType.Stop, 0);
@@ -735,6 +738,7 @@
val := getTType( Byte( keyAndValueType and $F));
Init( result, key, val, size);
ASSERT( (result.KeyType = key) and (result.ValueType = val));
+ CheckReadBytesAvailable(result);
end;
@@ -755,6 +759,7 @@
type_ := getTType( size_and_type);
Init( result, type_, size);
+ CheckReadBytesAvailable(result);
end;
@@ -775,6 +780,7 @@
type_ := getTType( size_and_type);
Init( result, type_, size);
+ CheckReadBytesAvailable(result);
end;
@@ -836,6 +842,7 @@
var length : Integer;
begin
length := Integer( ReadVarint32);
+ FTrans.CheckReadBytesAvailable(length);
SetLength( result, length);
if (length > 0)
then Transport.ReadAll( result, 0, length);
@@ -968,6 +975,32 @@
end;
+function TCompactProtocolImpl.GetMinSerializedSize( const aType : TType) : Integer;
+// Return the minimum number of bytes a type will consume on the wire
+begin
+ case aType of
+ TType.Stop: result := 0;
+ TType.Void: result := 0;
+ TType.Bool_: result := SizeOf(Byte);
+ TType.Byte_: result := SizeOf(Byte);
+ TType.Double_: result := 8; // uses fixedLongToBytes() which always writes 8 bytes
+ TType.I16: result := SizeOf(Byte);
+ TType.I32: result := SizeOf(Byte);
+ TType.I64: result := SizeOf(Byte);
+ TType.String_: result := SizeOf(Byte); // string length
+ TType.Struct: result := 0; // empty struct
+ TType.Map: result := SizeOf(Byte); // element count
+ TType.Set_: result := SizeOf(Byte); // element count
+ TType.List: result := SizeOf(Byte); // element count
+ else
+ raise TTransportExceptionBadArgs.Create('Unhandled type code');
+ end;
+end;
+
+
+
+
+
//--- unit tests -------------------------------------------
{$IFDEF Debug}
diff --git a/lib/delphi/src/Thrift.Protocol.JSON.pas b/lib/delphi/src/Thrift.Protocol.JSON.pas
index 85cb973..e72a81d 100644
--- a/lib/delphi/src/Thrift.Protocol.JSON.pas
+++ b/lib/delphi/src/Thrift.Protocol.JSON.pas
@@ -132,6 +132,10 @@
procedure PushContext( const aCtx : TJSONBaseContext);
procedure PopContext;
+ strict protected
+ function GetMinSerializedSize( const aType : TType) : Integer; override;
+ procedure Reset; override;
+
public
// TJSONProtocolImpl Constructor
constructor Create( const aTrans : ITransport);
@@ -480,6 +484,13 @@
end;
+procedure TJSONProtocolImpl.Reset;
+begin
+ inherited Reset;
+ ResetContextStack;
+end;
+
+
procedure TJSONProtocolImpl.ResetContextStack;
begin
while FContextStack.Count > 0
@@ -683,6 +694,7 @@
procedure TJSONProtocolImpl.WriteMessageBegin( const aMsg : TThriftMessage);
begin
+ Reset;
ResetContextStack; // THRIFT-1473
WriteJSONArrayStart;
@@ -1053,6 +1065,7 @@
function TJSONProtocolImpl.ReadMessageBegin: TThriftMessage;
begin
+ Reset;
ResetContextStack; // THRIFT-1473
Init( result);
@@ -1123,6 +1136,8 @@
result.ValueType := GetTypeIDForTypeName( str);
result.Count := ReadJSONInteger;
+ CheckReadBytesAvailable(result);
+
ReadJSONObjectStart;
end;
@@ -1143,6 +1158,7 @@
str := SysUtils.TEncoding.UTF8.GetString( ReadJSONString(FALSE));
result.ElementType := GetTypeIDForTypeName( str);
result.Count := ReadJSONInteger;
+ CheckReadBytesAvailable(result);
end;
@@ -1161,6 +1177,7 @@
str := SysUtils.TEncoding.UTF8.GetString( ReadJSONString(FALSE));
result.ElementType := GetTypeIDForTypeName( str);
result.Count := ReadJSONInteger;
+ CheckReadBytesAvailable(result);
end;
@@ -1218,6 +1235,30 @@
end;
+function TJSONProtocolImpl.GetMinSerializedSize( const aType : TType) : Integer;
+// Return the minimum number of bytes a type will consume on the wire
+begin
+ case aType of
+ TType.Stop: result := 0;
+ TType.Void: result := 0;
+ TType.Bool_: result := 1;
+ TType.Byte_: result := 1;
+ TType.Double_: result := 1;
+ TType.I16: result := 1;
+ TType.I32: result := 1;
+ TType.I64: result := 1;
+ TType.String_: result := 2; // empty string
+ TType.Struct: result := 2; // empty struct
+ TType.Map: result := 2; // empty map
+ TType.Set_: result := 2; // empty set
+ TType.List: result := 2; // empty list
+ else
+ raise TTransportExceptionBadArgs.Create('Unhandled type code');
+ end;
+end;
+
+
+
//--- init code ---
procedure InitBytes( var b : TBytes; aData : array of Byte);
diff --git a/lib/delphi/src/Thrift.Protocol.pas b/lib/delphi/src/Thrift.Protocol.pas
index 7c80221..94e6e18 100644
--- a/lib/delphi/src/Thrift.Protocol.pas
+++ b/lib/delphi/src/Thrift.Protocol.pas
@@ -196,7 +196,7 @@
end;
IProtocol = interface
- ['{602A7FFB-0D9E-4CD8-8D7F-E5076660588A}']
+ ['{7F3640D7-5082-49E7-B562-84202F323C3A}']
function GetTransport: ITransport;
procedure WriteMessageBegin( const msg: TThriftMessage);
procedure WriteMessageEnd;
@@ -248,6 +248,7 @@
function NextRecursionLevel : IProtocolRecursionTracker;
procedure IncrementRecursionDepth;
procedure DecrementRecursionDepth;
+ function GetMinSerializedSize( const aType : TType) : Integer;
property Transport: ITransport read GetTransport;
property RecursionLimit : Integer read GetRecursionLimit write SetRecursionLimit;
@@ -265,6 +266,12 @@
procedure IncrementRecursionDepth;
procedure DecrementRecursionDepth;
+ function GetMinSerializedSize( const aType : TType) : Integer; virtual; abstract;
+ procedure CheckReadBytesAvailable( const value : TThriftList); overload; inline;
+ procedure CheckReadBytesAvailable( const value : TThriftSet); overload; inline;
+ procedure CheckReadBytesAvailable( const value : TThriftMap); overload; inline;
+
+ procedure Reset; virtual;
function GetTransport: ITransport;
public
procedure WriteMessageBegin( const msg: TThriftMessage); virtual; abstract;
@@ -332,6 +339,7 @@
strict protected
FStrictRead : Boolean;
FStrictWrite : Boolean;
+ function GetMinSerializedSize( const aType : TType) : Integer; override;
strict private
function ReadAll( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer ): Integer; inline;
@@ -404,6 +412,9 @@
strict private
FWrappedProtocol : IProtocol;
+ strict protected
+ function GetMinSerializedSize( const aType : TType) : Integer; override;
+
public
// Encloses the specified protocol.
// All operations will be forward to the given protocol. Must be non-null.
@@ -583,6 +594,12 @@
Result := FTrans;
end;
+procedure TProtocolImpl.Reset;
+begin
+ if FTrans.TransportControl <> nil
+ then FTrans.TransportControl.ResetConsumedMessageSize;
+end;
+
function TProtocolImpl.ReadAnsiString: AnsiString;
var
b : TBytes;
@@ -623,6 +640,29 @@
WriteBinary( b );
end;
+
+procedure TProtocolImpl.CheckReadBytesAvailable( const value : TThriftList);
+begin
+ FTrans.CheckReadBytesAvailable( value.Count * GetMinSerializedSize(value.ElementType));
+end;
+
+
+procedure TProtocolImpl.CheckReadBytesAvailable( const value : TThriftSet);
+begin
+ FTrans.CheckReadBytesAvailable( value.Count * GetMinSerializedSize(value.ElementType));
+end;
+
+
+procedure TProtocolImpl.CheckReadBytesAvailable( const value : TThriftMap);
+var nPairSize : Integer
+;
+begin
+ nPairSize := GetMinSerializedSize(value.KeyType) + GetMinSerializedSize(value.ValueType);
+ FTrans.CheckReadBytesAvailable( value.Count * nPairSize);
+end;
+
+
+
{ TProtocolUtil }
class procedure TProtocolUtil.Skip( prot: IProtocol; type_: TType);
@@ -705,6 +745,7 @@
buf : TBytes;
begin
size := ReadI32;
+ FTrans.CheckReadBytesAvailable( size);
SetLength( buf, size);
FTrans.ReadAll( buf, 0, size);
Result := buf;
@@ -777,6 +818,7 @@
begin
result.ElementType := TType(ReadByte);
result.Count := ReadI32;
+ CheckReadBytesAvailable(result);
end;
procedure TBinaryProtocolImpl.ReadListEnd;
@@ -789,6 +831,7 @@
result.KeyType := TType(ReadByte);
result.ValueType := TType(ReadByte);
result.Count := ReadI32;
+ CheckReadBytesAvailable(result);
end;
procedure TBinaryProtocolImpl.ReadMapEnd;
@@ -801,6 +844,7 @@
size : Integer;
version : Integer;
begin
+ Reset;
Init( result);
size := ReadI32;
if (size < 0) then begin
@@ -832,6 +876,7 @@
begin
result.ElementType := TType(ReadByte);
result.Count := ReadI32;
+ CheckReadBytesAvailable(result);
end;
procedure TBinaryProtocolImpl.ReadSetEnd;
@@ -842,6 +887,7 @@
function TBinaryProtocolImpl.ReadStringBody( size: Integer): string;
var buf : TBytes;
begin
+ FTrans.CheckReadBytesAvailable( size);
SetLength( buf, size);
FTrans.ReadAll( buf, 0, size );
Result := TEncoding.UTF8.GetString( buf);
@@ -959,6 +1005,7 @@
procedure TBinaryProtocolImpl.WriteMessageBegin( const msg: TThriftMessage);
var version : Cardinal;
begin
+ Reset;
if FStrictWrite then begin
version := VERSION_1 or Cardinal( msg.Type_);
WriteI32( Integer( version) );
@@ -997,6 +1044,29 @@
end;
+function TBinaryProtocolImpl.GetMinSerializedSize( const aType : TType) : Integer;
+// Return the minimum number of bytes a type will consume on the wire
+begin
+ case aType of
+ TType.Stop: result := 0;
+ TType.Void: result := 0;
+ TType.Bool_: result := SizeOf(Byte);
+ TType.Byte_: result := SizeOf(Byte);
+ TType.Double_: result := SizeOf(Double);
+ TType.I16: result := SizeOf(Int16);
+ TType.I32: result := SizeOf(Int32);
+ TType.I64: result := SizeOf(Int64);
+ TType.String_: result := SizeOf(Int32); // string length
+ TType.Struct: result := 0; // empty struct
+ TType.Map: result := SizeOf(Int32); // element count
+ TType.Set_: result := SizeOf(Int32); // element count
+ TType.List: result := SizeOf(Int32); // element count
+ else
+ raise TTransportExceptionBadArgs.Create('Unhandled type code');
+ end;
+end;
+
+
{ TProtocolException }
constructor TProtocolException.HiddenCreate(const Msg: string);
@@ -1363,6 +1433,12 @@
end;
+function TProtocolDecorator.GetMinSerializedSize( const aType : TType) : Integer;
+begin
+ result := FWrappedProtocol.GetMinSerializedSize(aType);
+end;
+
+
{ Init helper functions }
procedure Init( var rec : TThriftMessage; const AName: string; const AMessageType: TMessageType; const ASeqID: Integer);
diff --git a/lib/delphi/src/Thrift.Serializer.pas b/lib/delphi/src/Thrift.Serializer.pas
index b95cf61..1cbcbec 100644
--- a/lib/delphi/src/Thrift.Serializer.pas
+++ b/lib/delphi/src/Thrift.Serializer.pas
@@ -71,15 +71,18 @@
public
// Create a new TDeserializer that uses the TBinaryProtocol by default.
- constructor Create; overload;
+ constructor Create( const aMaxMessageSize : Integer = DEFAULT_MAX_MESSAGE_SIZE); overload;
// Create a new TDeserializer.
// It will use the TProtocol specified by the factory that is passed in.
- constructor Create( const factory : IProtocolFactory); overload;
+ constructor Create( const factory : IProtocolFactory;
+ const aMaxMessageSize : Integer = DEFAULT_MAX_MESSAGE_SIZE); overload;
// Create a new TDeserializer.
// It will use the TProtocol and layered transports specified by the factories that are passed in.
- constructor Create( const protfact : IProtocolFactory; const transfact : ITransportFactory); overload;
+ constructor Create( const protfact : IProtocolFactory;
+ const transfact : ITransportFactory;
+ const aMaxMessageSize : Integer = DEFAULT_MAX_MESSAGE_SIZE); overload;
// DTOR
destructor Destroy; override;
@@ -122,7 +125,7 @@
inherited Create;
FStream := TMemoryStream.Create;
adapter := TThriftStreamAdapterDelphi.Create( FStream, FALSE);
- FTransport := TStreamTransportImpl.Create( nil, adapter);
+ FTransport := TStreamTransportImpl.Create( nil, adapter, TTransportControlImpl.Create(0)); // we don't read anything here
if transfact <> nil then FTransport := transfact.GetTransport( FTransport);
FProtocol := protfact.GetProtocol( FTransport);
@@ -185,24 +188,26 @@
{ TDeserializer }
-constructor TDeserializer.Create;
+constructor TDeserializer.Create( const aMaxMessageSize : Integer);
// Create a new TDeserializer that uses the TBinaryProtocol by default.
begin
//no inherited;
- Create( TBinaryProtocolImpl.TFactory.Create, nil);
+ Create( TBinaryProtocolImpl.TFactory.Create, nil, aMaxMessageSize);
end;
-constructor TDeserializer.Create( const factory : IProtocolFactory);
+constructor TDeserializer.Create( const factory : IProtocolFactory; const aMaxMessageSize : Integer);
// Create a new TDeserializer.
// It will use the TProtocol specified by the factory that is passed in.
begin
//no inherited;
- Create( factory, nil);
+ Create( factory, nil, aMaxMessageSize);
end;
-constructor TDeserializer.Create( const protfact : IProtocolFactory; const transfact : ITransportFactory);
+constructor TDeserializer.Create( const protfact : IProtocolFactory;
+ const transfact : ITransportFactory;
+ const aMaxMessageSize : Integer = DEFAULT_MAX_MESSAGE_SIZE);
// Create a new TDeserializer.
// It will use the TProtocol specified by the factory that is passed in.
var adapter : IThriftStream;
@@ -210,7 +215,7 @@
inherited Create;
FStream := TMemoryStream.Create;
adapter := TThriftStreamAdapterDelphi.Create( FStream, FALSE);
- FTransport := TStreamTransportImpl.Create( adapter, nil);
+ FTransport := TStreamTransportImpl.Create( adapter, nil, TTransportControlImpl.Create(aMaxMessageSize));
if transfact <> nil then FTransport := transfact.GetTransport( FTransport);
FProtocol := protfact.GetProtocol( FTransport);
diff --git a/lib/delphi/src/Thrift.Stream.pas b/lib/delphi/src/Thrift.Stream.pas
index 7cb9219..0f4e723 100644
--- a/lib/delphi/src/Thrift.Stream.pas
+++ b/lib/delphi/src/Thrift.Stream.pas
@@ -37,11 +37,12 @@
type
IThriftStream = interface
- ['{2A77D916-7446-46C1-8545-0AEC0008DBCA}']
+ ['{DBE61E28-2A77-42DB-A5A3-3CCB8A2D09FA}']
procedure Write( const buffer: TBytes; offset: Integer; count: Integer); overload;
procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); overload;
function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer; overload;
function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; overload;
+ procedure CheckReadBytesAvailable( const value : Integer);
procedure Open;
procedure Close;
procedure Flush;
@@ -49,14 +50,24 @@
function ToArray: TBytes;
end;
- TThriftStreamImpl = class( TInterfacedObject, IThriftStream)
+
+ IThriftStream2 = interface( IThriftStream)
+ ['{1F55D9FE-F617-4B80-B8CA-4A300D8E33F6}']
+ function Size : Int64;
+ function Position : Int64;
+ end;
+
+
+ TThriftStreamImpl = class abstract( TInterfacedObject, IThriftStream)
strict private
procedure CheckSizeAndOffset( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer); overload;
strict protected
+ // IThriftStream
procedure Write( const buffer: TBytes; offset: Integer; count: Integer); overload; inline;
procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); overload; virtual;
function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer; overload; inline;
function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; overload; virtual;
+ procedure CheckReadBytesAvailable( const value : Integer); virtual; abstract;
procedure Open; virtual; abstract;
procedure Close; virtual; abstract;
procedure Flush; virtual; abstract;
@@ -64,40 +75,54 @@
function ToArray: TBytes; virtual; abstract;
end;
- TThriftStreamAdapterDelphi = class( TThriftStreamImpl )
+ TThriftStreamAdapterDelphi = class( TThriftStreamImpl, IThriftStream2)
strict private
FStream : TStream;
FOwnsStream : Boolean;
strict protected
+ // IThriftStream
procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); override;
function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
+ procedure CheckReadBytesAvailable( const value : Integer); override;
procedure Open; override;
procedure Close; override;
procedure Flush; override;
function IsOpen: Boolean; override;
function ToArray: TBytes; override;
+
+ // IThriftStream2
+ function Size : Int64;
+ function Position : Int64;
public
- constructor Create( const AStream: TStream; AOwnsStream : Boolean);
+ constructor Create( const aStream: TStream; aOwnsStream : Boolean);
destructor Destroy; override;
end;
- TThriftStreamAdapterCOM = class( TThriftStreamImpl)
+ TThriftStreamAdapterCOM = class( TThriftStreamImpl, IThriftStream2)
strict private
FStream : IStream;
strict protected
+ // IThriftStream
procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); override;
function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
+ procedure CheckReadBytesAvailable( const value : Integer); override;
procedure Open; override;
procedure Close; override;
procedure Flush; override;
function IsOpen: Boolean; override;
function ToArray: TBytes; override;
+
+ // IThriftStream2
+ function Size : Int64;
+ function Position : Int64;
public
- constructor Create( const AStream: IStream);
+ constructor Create( const aStream: IStream);
end;
implementation
+uses Thrift.Transport;
+
{ TThriftStreamAdapterCOM }
procedure TThriftStreamAdapterCOM.Close;
@@ -105,10 +130,10 @@
FStream := nil;
end;
-constructor TThriftStreamAdapterCOM.Create( const AStream: IStream);
+constructor TThriftStreamAdapterCOM.Create( const aStream: IStream);
begin
inherited Create;
- FStream := AStream;
+ FStream := aStream;
end;
procedure TThriftStreamAdapterCOM.Flush;
@@ -120,6 +145,24 @@
end;
end;
+function TThriftStreamAdapterCOM.Size : Int64;
+var statstg: TStatStg;
+begin
+ FillChar( statstg, SizeOf( statstg), 0);
+ if IsOpen
+ and Succeeded( FStream.Stat( statstg, STATFLAG_NONAME ))
+ then result := statstg.cbSize
+ else result := 0;
+end;
+
+function TThriftStreamAdapterCOM.Position : Int64;
+var newpos : {$IF CompilerVersion >= 29.0} UInt64 {$ELSE} Int64 {$IFEND};
+begin
+ if SUCCEEDED( FStream.Seek( 0, STREAM_SEEK_CUR, newpos))
+ then result := Int64(newpos)
+ else raise TTransportExceptionEndOfFile.Create('Seek() error');
+end;
+
function TThriftStreamAdapterCOM.IsOpen: Boolean;
begin
Result := FStream <> nil;
@@ -148,21 +191,21 @@
end;
end;
+procedure TThriftStreamAdapterCOM.CheckReadBytesAvailable( const value : Integer);
+var nRemaining : Int64;
+begin
+ nRemaining := Self.Size - Self.Position;
+ if nRemaining < value
+ then raise TTransportExceptionEndOfFile.Create('Not enough input data');
+end;
+
function TThriftStreamAdapterCOM.ToArray: TBytes;
var
- statstg: TStatStg;
- len : Integer;
+ len : Int64;
NewPos : {$IF CompilerVersion >= 29.0} UInt64 {$ELSE} Int64 {$IFEND};
cbRead : Integer;
begin
- FillChar( statstg, SizeOf( statstg), 0);
- len := 0;
- if IsOpen then begin
- if Succeeded( FStream.Stat( statstg, STATFLAG_NONAME )) then begin
- len := statstg.cbSize;
- end;
- end;
-
+ len := Self.Size;
SetLength( Result, len );
if len > 0 then begin
@@ -226,6 +269,21 @@
{ TThriftStreamAdapterDelphi }
+constructor TThriftStreamAdapterDelphi.Create( const aStream: TStream; aOwnsStream: Boolean);
+begin
+ inherited Create;
+ FStream := aStream;
+ FOwnsStream := aOwnsStream;
+end;
+
+destructor TThriftStreamAdapterDelphi.Destroy;
+begin
+ if FOwnsStream
+ then Close;
+
+ inherited;
+end;
+
procedure TThriftStreamAdapterDelphi.Close;
begin
FStream.Free;
@@ -233,26 +291,21 @@
FOwnsStream := False;
end;
-constructor TThriftStreamAdapterDelphi.Create( const AStream: TStream; AOwnsStream: Boolean);
-begin
- inherited Create;
- FStream := AStream;
- FOwnsStream := AOwnsStream;
-end;
-
-destructor TThriftStreamAdapterDelphi.Destroy;
-begin
- if FOwnsStream
- then Close;
-
- inherited;
-end;
-
procedure TThriftStreamAdapterDelphi.Flush;
begin
// nothing to do
end;
+function TThriftStreamAdapterDelphi.Size : Int64;
+begin
+ result := FStream.Size;
+end;
+
+function TThriftStreamAdapterDelphi.Position : Int64;
+begin
+ result := FStream.Position;
+end;
+
function TThriftStreamAdapterDelphi.IsOpen: Boolean;
begin
Result := FStream <> nil;
@@ -279,16 +332,21 @@
else Result := 0;
end;
+procedure TThriftStreamAdapterDelphi.CheckReadBytesAvailable( const value : Integer);
+var nRemaining : Int64;
+begin
+ nRemaining := FStream.Size - FStream.Position;
+ if nRemaining < value then raise TTransportExceptionEndOfFile.Create('Not enough input data');
+end;
+
function TThriftStreamAdapterDelphi.ToArray: TBytes;
var
OrgPos : Integer;
len : Integer;
begin
- len := 0;
- if FStream <> nil then
- begin
- len := FStream.Size;
- end;
+ if FStream <> nil
+ then len := FStream.Size
+ else len := 0;
SetLength( Result, len );
diff --git a/lib/delphi/src/Thrift.Transport.MsxmlHTTP.pas b/lib/delphi/src/Thrift.Transport.MsxmlHTTP.pas
index 87bf23b..b92cce1 100644
--- a/lib/delphi/src/Thrift.Transport.MsxmlHTTP.pas
+++ b/lib/delphi/src/Thrift.Transport.MsxmlHTTP.pas
@@ -59,6 +59,7 @@
function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
procedure Write( const pBuf : Pointer; off, len : Integer); override;
procedure Flush; override;
+ procedure CheckReadBytesAvailable( const value : Integer); override;
procedure SetDnsResolveTimeout(const Value: Integer);
function GetDnsResolveTimeout: Integer;
@@ -80,7 +81,7 @@
property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
public
- constructor Create( const AUri: string);
+ constructor Create( const AUri: string; const aTransportCtl : ITransportControl = nil);
destructor Destroy; override;
end;
@@ -90,9 +91,9 @@
{ TMsxmlHTTPClientImpl }
-constructor TMsxmlHTTPClientImpl.Create(const AUri: string);
+constructor TMsxmlHTTPClientImpl.Create(const AUri: string; const aTransportCtl : ITransportControl);
begin
- inherited Create;
+ inherited Create( aTransportCtl);
FUri := AUri;
// defaults according to MSDN
@@ -218,6 +219,13 @@
end;
end;
+procedure TMsxmlHTTPClientImpl.CheckReadBytesAvailable( const value : Integer);
+begin
+ if FInputStream <> nil
+ then FInputStream.CheckReadBytesAvailable( value)
+ else raise TTransportExceptionNotOpen.Create('No request has been sent');
+end;
+
function TMsxmlHTTPClientImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
begin
if FInputStream = nil then begin
@@ -225,7 +233,8 @@
end;
try
- Result := FInputStream.Read( pBuf, buflen, off, len)
+ Result := FInputStream.Read( pBuf, buflen, off, len);
+ ConsumeReadBytes( result);
except
on E: Exception
do raise TTransportExceptionUnknown.Create(E.Message);
diff --git a/lib/delphi/src/Thrift.Transport.Pipes.pas b/lib/delphi/src/Thrift.Transport.Pipes.pas
index 9368f2f..b602b64 100644
--- a/lib/delphi/src/Thrift.Transport.Pipes.pas
+++ b/lib/delphi/src/Thrift.Transport.Pipes.pas
@@ -53,6 +53,7 @@
//procedure Open; override; - see derived classes
procedure Close; override;
procedure Flush; override;
+ procedure CheckReadBytesAvailable( const value : Integer); override;
function ReadDirect( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; overload;
function ReadOverlapped( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; overload;
@@ -123,13 +124,17 @@
TNamedPipeTransportClientEndImpl = class( TPipeTransportBase)
public
// Named pipe constructors
- constructor Create( aPipe : THandle; aOwnsHandle : Boolean;
- const aTimeOut : DWORD); overload;
+ constructor Create( const aPipe : THandle;
+ const aOwnsHandle : Boolean;
+ const aTimeOut : DWORD;
+ const aTransportCtl : ITransportControl); overload;
+
constructor Create( const aPipeName : string;
const aShareMode: DWORD = 0;
const aSecurityAttributes: PSecurityAttributes = nil;
const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT;
- const aOpenTimeOut : DWORD = DEFAULT_THRIFT_PIPE_OPEN_TIMEOUT); overload;
+ const aOpenTimeOut : DWORD = DEFAULT_THRIFT_PIPE_OPEN_TIMEOUT;
+ const aTransportCtl : ITransportControl = nil); overload;
end;
@@ -139,17 +144,20 @@
public
// ITransport
procedure Close; override;
- constructor Create( aPipe : THandle; aOwnsHandle : Boolean;
- const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT); reintroduce;
+ constructor Create( const aPipe : THandle;
+ const aOwnsHandle : Boolean;
+ const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT;
+ const aTransportCtl : ITransportControl = nil); reintroduce;
end;
TAnonymousPipeTransportImpl = class( TPipeTransportBase)
public
// Anonymous pipe constructor
- constructor Create(const aPipeRead, aPipeWrite : THandle;
- aOwnsHandles : Boolean;
- const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT); overload;
+ constructor Create( const aPipeRead, aPipeWrite : THandle;
+ const aOwnsHandles : Boolean;
+ const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT;
+ const aTransportCtl : ITransportControl = nil); overload;
end;
@@ -310,6 +318,12 @@
end;
+procedure TPipeStreamBase.CheckReadBytesAvailable( const value : Integer);
+begin
+ // can't tell how much we can suck out of the pipe
+end;
+
+
procedure TPipeStreamBase.Write( const pBuf : Pointer; offset, count : Integer);
begin
if FOverlapped
@@ -642,21 +656,24 @@
constructor TNamedPipeTransportClientEndImpl.Create( const aPipeName : string; const aShareMode: DWORD;
- const aSecurityAttributes: PSecurityAttributes;
- const aTimeOut, aOpenTimeOut : DWORD);
+ const aSecurityAttributes: PSecurityAttributes;
+ const aTimeOut, aOpenTimeOut : DWORD;
+ const aTransportCtl : ITransportControl);
// Named pipe constructor
begin
- inherited Create( nil, nil);
+ inherited Create( nil, nil, aTransportCtl);
FInputStream := TNamedPipeStreamImpl.Create( aPipeName, TRUE, aShareMode, aSecurityAttributes, aTimeOut, aOpenTimeOut);
FOutputStream := FInputStream; // true for named pipes
end;
-constructor TNamedPipeTransportClientEndImpl.Create( aPipe : THandle; aOwnsHandle : Boolean;
- const aTimeOut : DWORD);
+constructor TNamedPipeTransportClientEndImpl.Create( const aPipe : THandle;
+ const aOwnsHandle : Boolean;
+ const aTimeOut : DWORD;
+ const aTransportCtl : ITransportControl);
// Named pipe constructor
begin
- inherited Create( nil, nil);
+ inherited Create( nil, nil, aTransportCtl);
FInputStream := THandlePipeStreamImpl.Create( aPipe, TRUE, aOwnsHandle, aTimeOut);
FOutputStream := FInputStream; // true for named pipes
end;
@@ -665,12 +682,14 @@
{ TNamedPipeTransportServerEndImpl }
-constructor TNamedPipeTransportServerEndImpl.Create( aPipe : THandle; aOwnsHandle : Boolean;
- const aTimeOut : DWORD);
+constructor TNamedPipeTransportServerEndImpl.Create( const aPipe : THandle;
+ const aOwnsHandle : Boolean;
+ const aTimeOut : DWORD;
+ const aTransportCtl : ITransportControl);
// Named pipe constructor
begin
FHandle := DuplicatePipeHandle( aPipe);
- inherited Create( aPipe, aOwnsHandle, aTimeOut);
+ inherited Create( aPipe, aOwnsHandle, aTimeOut, aTransportCtl);
end;
@@ -688,11 +707,12 @@
constructor TAnonymousPipeTransportImpl.Create( const aPipeRead, aPipeWrite : THandle;
- aOwnsHandles : Boolean;
- const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT);
+ const aOwnsHandles : Boolean;
+ const aTimeOut : DWORD;
+ const aTransportCtl : ITransportControl);
// Anonymous pipe constructor
begin
- inherited Create( nil, nil);
+ inherited Create( nil, nil, aTransportCtl);
// overlapped is not supported with AnonPipes, see MSDN
FInputStream := THandlePipeStreamImpl.Create( aPipeRead, aOwnsHandles, FALSE, aTimeOut);
FOutputStream := THandlePipeStreamImpl.Create( aPipeWrite, aOwnsHandles, FALSE, aTimeOut);
diff --git a/lib/delphi/src/Thrift.Transport.WinHTTP.pas b/lib/delphi/src/Thrift.Transport.WinHTTP.pas
index 0bf7e45..ec8c87f 100644
--- a/lib/delphi/src/Thrift.Transport.WinHTTP.pas
+++ b/lib/delphi/src/Thrift.Transport.WinHTTP.pas
@@ -58,16 +58,19 @@
THTTPResponseStream = class( TThriftStreamImpl)
strict private
FRequest : IWinHTTPRequest;
+ FTransportControl : ITransportControl;
strict protected
procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); override;
function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
+ procedure CheckReadBytesAvailable( const value : Integer); override;
+ procedure ConsumeReadBytes( const count : Integer);
procedure Open; override;
procedure Close; override;
procedure Flush; override;
function IsOpen: Boolean; override;
function ToArray: TBytes; override;
public
- constructor Create( const aRequest : IWinHTTPRequest);
+ constructor Create( const aRequest : IWinHTTPRequest; const aTransportCtl : ITransportControl);
destructor Destroy; override;
end;
@@ -78,6 +81,7 @@
function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
procedure Write( const pBuf : Pointer; off, len : Integer); override;
procedure Flush; override;
+ procedure CheckReadBytesAvailable( const value : Integer); override;
procedure SetDnsResolveTimeout(const Value: Integer);
function GetDnsResolveTimeout: Integer;
@@ -99,7 +103,7 @@
property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
public
- constructor Create( const AUri: string);
+ constructor Create( const AUri: string; const aTransportCtl : ITransportControl = nil);
destructor Destroy; override;
end;
@@ -108,9 +112,9 @@
{ TWinHTTPClientImpl }
-constructor TWinHTTPClientImpl.Create(const AUri: string);
+constructor TWinHTTPClientImpl.Create(const AUri: string; const aTransportCtl : ITransportControl);
begin
- inherited Create;
+ inherited Create( aTransportCtl);
FUri := AUri;
// defaults according to MSDN
@@ -284,6 +288,13 @@
end;
end;
+procedure TWinHTTPClientImpl.CheckReadBytesAvailable( const value : Integer);
+begin
+ if FInputStream <> nil
+ then FInputStream.CheckReadBytesAvailable( value)
+ else raise TTransportExceptionNotOpen.Create('No request has been sent');
+end;
+
function TWinHTTPClientImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
begin
if FInputStream = nil then begin
@@ -291,7 +302,8 @@
end;
try
- Result := FInputStream.Read( pBuf, buflen, off, len)
+ Result := FInputStream.Read( pBuf, buflen, off, len);
+ ConsumeReadBytes( result);
except
on E: Exception
do raise TTransportExceptionUnknown.Create(E.Message);
@@ -301,6 +313,7 @@
procedure TWinHTTPClientImpl.SendRequest;
var
http : IWinHTTPRequest;
+ ctrl : ITransportControl;
pData : PByte;
len : Integer;
error : Cardinal;
@@ -327,7 +340,8 @@
else raise TTransportExceptionInterrupted.Create( sMsg);
end;
- FInputStream := THTTPResponseStream.Create(http);
+ ctrl := TTransportControlImpl.Create( TransportControl.MaxAllowedMessageSize);
+ FInputStream := THTTPResponseStream.Create( http, ctrl);
end;
procedure TWinHTTPClientImpl.Write( const pBuf : Pointer; off, len : Integer);
@@ -341,10 +355,12 @@
{ TWinHTTPClientImpl.THTTPResponseStream }
-constructor TWinHTTPClientImpl.THTTPResponseStream.Create( const aRequest : IWinHTTPRequest);
+constructor TWinHTTPClientImpl.THTTPResponseStream.Create( const aRequest : IWinHTTPRequest; const aTransportCtl : ITransportControl);
begin
inherited Create;
FRequest := aRequest;
+ FTransportControl := aTransportCtl;
+ ASSERT( FTransportControl <> nil);
end;
destructor TWinHTTPClientImpl.THTTPResponseStream.Destroy;
@@ -390,6 +406,8 @@
if count >= buflen-offset
then count := buflen-offset;
+ CheckReadBytesAvailable(count);
+
if count > 0 then begin
pTmp := pBuf;
Inc( pTmp, offset);
@@ -397,6 +415,20 @@
ASSERT( Result >= 0);
end
else Result := 0;
+
+ ConsumeReadBytes( result);
+end;
+
+procedure TWinHTTPClientImpl.THTTPResponseStream.ConsumeReadBytes( const count : Integer);
+begin
+ if FTransportControl <> nil
+ then FTransportControl.ConsumeReadBytes( count);
+end;
+
+procedure TWinHTTPClientImpl.THTTPResponseStream.CheckReadBytesAvailable( const value : Integer);
+begin
+ if Int64(value) > Int64(FRequest.QueryDataAvailable)
+ then raise TTransportExceptionEndOfFile.Create('Not enough input data');
end;
function TWinHTTPClientImpl.THTTPResponseStream.ToArray: TBytes;
diff --git a/lib/delphi/src/Thrift.Transport.pas b/lib/delphi/src/Thrift.Transport.pas
index bede57c..a3476bf 100644
--- a/lib/delphi/src/Thrift.Transport.pas
+++ b/lib/delphi/src/Thrift.Transport.pas
@@ -44,14 +44,39 @@
Thrift.WinHTTP,
Thrift.Stream;
+const
+ DEFAULT_MAX_MESSAGE_SIZE = 100 * 1024 * 1024; // 100 MB
+ DEFAULT_THRIFT_TIMEOUT = 5 * 1000; // ms
+
type
+ ITransportControl = interface
+ ['{CDA35E2C-F1D2-4BE3-9927-7F1540923265}']
+ function MaxAllowedMessageSize : Integer;
+ procedure ConsumeReadBytes( const count : Integer);
+ procedure ResetConsumedMessageSize;
+ end;
+
+ TTransportControlImpl = class( TInterfacedObject, ITransportControl)
+ strict private
+ FMaxAllowedMsgSize : Integer;
+ FRemainingMsgSize : Integer;
+ strict protected
+ // ITransportControl
+ function MaxAllowedMessageSize : Integer;
+ procedure ConsumeReadBytes( const count : Integer);
+ procedure ResetConsumedMessageSize;
+ public
+ constructor Create( const aMaxMessageSize : Integer = DEFAULT_MAX_MESSAGE_SIZE); reintroduce;
+ end;
+
ITransport = interface
- ['{DB84961E-8BB3-4532-99E1-A8C7AC2300F7}']
+ ['{938F6EB5-1848-43D5-8AC4-07633C55B229}']
function GetIsOpen: Boolean;
property IsOpen: Boolean read GetIsOpen;
function Peek: Boolean;
procedure Open;
procedure Close;
+
function Read(var buf: TBytes; off: Integer; len: Integer): Integer; overload;
function Read(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload;
function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; overload;
@@ -61,15 +86,22 @@
procedure Write( const pBuf : Pointer; off, len : Integer); overload;
procedure Write( const pBuf : Pointer; len : Integer); overload;
procedure Flush;
+
+ function TransportControl : ITransportControl;
+ procedure CheckReadBytesAvailable( const value : Integer);
end;
TTransportImpl = class( TInterfacedObject, ITransport)
+ strict private
+ FTransportControl : ITransportControl;
+
strict protected
function GetIsOpen: Boolean; virtual; abstract;
property IsOpen: Boolean read GetIsOpen;
function Peek: Boolean; virtual;
procedure Open(); virtual; abstract;
procedure Close(); virtual; abstract;
+
function Read(var buf: TBytes; off: Integer; len: Integer): Integer; overload; inline;
function Read(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload; virtual; abstract;
function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; overload; inline;
@@ -79,6 +111,13 @@
procedure Write( const pBuf : Pointer; len : Integer); overload; inline;
procedure Write( const pBuf : Pointer; off, len : Integer); overload; virtual; abstract;
procedure Flush; virtual;
+
+ function TransportControl : ITransportControl; inline;
+ procedure ConsumeReadBytes( const count : Integer); inline;
+ procedure CheckReadBytesAvailable( const value : Integer); virtual; abstract;
+
+ public
+ constructor Create( const aTransportCtl : ITransportControl); reintroduce;
end;
TTransportException = class abstract( TException)
@@ -98,9 +137,9 @@
constructor HiddenCreate(const Msg: string);
class function GetType: TExceptionType; virtual; abstract;
public
- class function Create( AType: TExceptionType): TTransportException; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
+ class function Create( aType: TExceptionType): TTransportException; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
class function Create( const msg: string): TTransportException; reintroduce; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
- class function Create( AType: TExceptionType; const msg: string): TTransportException; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
+ class function Create( aType: TExceptionType; const msg: string): TTransportException; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
property Type_: TExceptionType read GetType;
end;
@@ -196,7 +235,7 @@
ITransportFactory = interface
['{DD809446-000F-49E1-9BFF-E0D0DC76A9D7}']
- function GetTransport( const ATrans: ITransport): ITransport;
+ function GetTransport( const aTransport: ITransport): ITransport;
end;
TTransportFactoryImpl = class( TInterfacedObject, ITransportFactory)
@@ -222,6 +261,7 @@
strict protected
procedure Write( const pBuf : Pointer; offset, count: Integer); override;
function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
+ procedure CheckReadBytesAvailable( const value : Integer); override;
procedure Open; override;
procedure Close; override;
procedure Flush; override;
@@ -230,9 +270,9 @@
function ToArray: TBytes; override;
public
{$IFDEF OLD_SOCKETS}
- constructor Create( const ATcpClient: TCustomIpClient; const aTimeout : Integer = 0);
+ constructor Create( const aTcpClient: TCustomIpClient; const aTimeout : Integer = 0);
{$ELSE}
- constructor Create( const ATcpClient: TSocket; const aTimeout : Longword = 0);
+ constructor Create( const aTcpClient: TSocket; const aTimeout : Longword = 0);
{$ENDIF}
end;
@@ -254,14 +294,15 @@
function GetInputStream: IThriftStream;
function GetOutputStream: IThriftStream;
- protected
+ procedure CheckReadBytesAvailable( const value : Integer); override;
+ strict protected
procedure Open; override;
procedure Close; override;
procedure Flush; override;
function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
procedure Write( const pBuf : Pointer; off, len : Integer); override;
public
- constructor Create( const aInputStream, aOutputStream : IThriftStream);
+ constructor Create( const aInputStream, aOutputStream : IThriftStream; const aTransportCtl : ITransportControl = nil);
destructor Destroy; override;
property InputStream : IThriftStream read GetInputStream;
@@ -277,6 +318,7 @@
strict protected
procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); override;
function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
+ procedure CheckReadBytesAvailable( const value : Integer); override;
procedure Open; override;
procedure Close; override;
procedure Flush; override;
@@ -298,15 +340,19 @@
{$ENDIF}
FUseBufferedSocket : Boolean;
FOwnsServer : Boolean;
+ FTransportControl : ITransportControl;
+
strict protected
function Accept( const fnAccepting: TProc) : ITransport; override;
+ property TransportControl : ITransportControl read FTransportControl;
+
public
{$IFDEF OLD_SOCKETS}
- constructor Create( const aServer: TTcpServer; const aClientTimeout: Integer = 0); overload;
- constructor Create( const aPort: Integer; const aClientTimeout: Integer = 0; const aUseBufferedSockets: Boolean = FALSE); overload;
+ constructor Create( const aServer: TTcpServer; const aClientTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; const aTransportCtl : ITransportControl = nil); overload;
+ constructor Create( const aPort: Integer; const aClientTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; aUseBufferedSockets: Boolean = FALSE; const aTransportCtl : ITransportControl = nil); overload;
{$ELSE}
- constructor Create( const aServer: TServerSocket; const aClientTimeout: Longword = 0); overload;
- constructor Create( const aPort: Integer; const aClientTimeout: Longword = 0; const aUseBufferedSockets: Boolean = FALSE); overload;
+ constructor Create( const aServer: TServerSocket; const aClientTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; const aTransportCtl : ITransportControl = nil); overload;
+ constructor Create( const aPort: Integer; const aClientTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; aUseBufferedSockets: Boolean = FALSE; const aTransportCtl : ITransportControl = nil); overload;
{$ENDIF}
destructor Destroy; override;
procedure Listen; override;
@@ -325,6 +371,7 @@
strict protected
function GetIsOpen: Boolean; override;
procedure Flush; override;
+ procedure CheckReadBytesAvailable( const value : Integer); override;
public
type
TFactory = class( TTransportFactoryImpl )
@@ -363,11 +410,11 @@
public
procedure Open; override;
{$IFDEF OLD_SOCKETS}
- constructor Create( const aClient : TCustomIpClient; const aOwnsClient : Boolean; const aTimeout: Integer = 0); overload;
- constructor Create( const aHost: string; const aPort: Integer; const aTimeout: Integer = 0); overload;
+ constructor Create( const aClient : TCustomIpClient; const aOwnsClient : Boolean; const aTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; const aTransportCtl : ITransportControl = nil); overload;
+ constructor Create( const aHost: string; const aPort: Integer; const aTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; const aTransportCtl : ITransportControl = nil); overload;
{$ELSE}
- constructor Create( const aClient: TSocket; const aOwnsClient: Boolean); overload;
- constructor Create( const aHost: string; const aPort: Integer; const aTimeout: Longword = 0); overload;
+ constructor Create(const aClient: TSocket; const aOwnsClient: Boolean; const aTransportCtl : ITransportControl = nil); overload;
+ constructor Create( const aHost: string; const aPort: Integer; const aTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; const aTransportCtl : ITransportControl = nil); overload;
{$ENDIF}
destructor Destroy; override;
procedure Close; override;
@@ -402,6 +449,7 @@
function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
procedure Write( const pBuf : Pointer; off, len : Integer); override;
procedure Flush; override;
+ procedure CheckReadBytesAvailable( const value : Integer); override;
public
type
TFactory = class( TTransportFactoryImpl )
@@ -409,20 +457,67 @@
function GetTransport( const aTransport: ITransport): ITransport; override;
end;
+ constructor Create( const aTransportCtl : ITransportControl); overload;
constructor Create( const aTransport: ITransport); overload;
destructor Destroy; override;
end;
const
- DEFAULT_THRIFT_TIMEOUT = 5 * 1000; // ms
DEFAULT_THRIFT_SECUREPROTOCOLS = [ TSecureProtocol.TLS_1_1, TSecureProtocol.TLS_1_2];
implementation
+{ TTransportControlImpl }
+
+constructor TTransportControlImpl.Create( const aMaxMessageSize : Integer);
+begin
+ inherited Create;
+
+ if aMaxMessageSize > 0
+ then FMaxAllowedMsgSize := aMaxMessageSize
+ else FMaxAllowedMsgSize := DEFAULT_MAX_MESSAGE_SIZE;
+
+ ResetConsumedMessageSize;
+end;
+
+function TTransportControlImpl.MaxAllowedMessageSize : Integer;
+begin
+ result := FMaxAllowedMsgSize;
+end;
+
+procedure TTransportControlImpl.ResetConsumedMessageSize;
+begin
+ FRemainingMsgSize := MaxAllowedMessageSize;
+end;
+
+
+procedure TTransportControlImpl.ConsumeReadBytes( const count : Integer);
+begin
+ if FRemainingMsgSize >= count
+ then Dec( FRemainingMsgSize, count)
+ else begin
+ FRemainingMsgSize := 0;
+ if FRemainingMsgSize < count
+ then raise TTransportExceptionEndOfFile.Create('Maximum message size reached');
+ end;
+end;
+
+
{ TTransportImpl }
+constructor TTransportImpl.Create( const aTransportCtl : ITransportControl);
+begin
+ inherited Create;
+
+ if aTransportCtl <> nil
+ then FTransportControl := aTransportCtl
+ else FTransportControl := TTransportControlImpl.Create;
+ ASSERT( FTransportControl <> nil);
+end;
+
+
procedure TTransportImpl.Flush;
begin
// nothing to do
@@ -477,6 +572,19 @@
end;
+function TTransportImpl.TransportControl : ITransportControl;
+begin
+ result := FTransportControl;
+end;
+
+
+procedure TTransportImpl.ConsumeReadBytes( const count : Integer);
+begin
+ if FTransportControl <> nil
+ then FTransportControl.ConsumeReadBytes( count);
+end;
+
+
{ TTransportException }
constructor TTransportException.HiddenCreate(const Msg: string);
@@ -571,13 +679,15 @@
{ TServerSocket }
{$IFDEF OLD_SOCKETS}
-constructor TServerSocketImpl.Create( const aServer: TTcpServer; const aClientTimeout : Integer);
+constructor TServerSocketImpl.Create( const aServer: TTcpServer; const aClientTimeout : Integer; const aTransportCtl : ITransportControl);
{$ELSE}
-constructor TServerSocketImpl.Create( const aServer: TServerSocket; const aClientTimeout: Longword);
+constructor TServerSocketImpl.Create( const aServer: TServerSocket; const aClientTimeout: Longword; const aTransportCtl : ITransportControl);
{$ENDIF}
begin
inherited Create;
FServer := aServer;
+ FTransportControl := aTransportCtl;
+ ASSERT( FTransportControl <> nil);
{$IFDEF OLD_SOCKETS}
FClientTimeout := aClientTimeout;
@@ -589,13 +699,18 @@
{$IFDEF OLD_SOCKETS}
-constructor TServerSocketImpl.Create( const aPort: Integer; const aClientTimeout: Integer; const aUseBufferedSockets: Boolean);
+constructor TServerSocketImpl.Create( const aPort: Integer; const aClientTimeout: Integer; aUseBufferedSockets: Boolean; const aTransportCtl : ITransportControl);
{$ELSE}
-constructor TServerSocketImpl.Create( const aPort: Integer; const aClientTimeout: Longword; const aUseBufferedSockets: Boolean);
+constructor TServerSocketImpl.Create( const aPort: Integer; const aClientTimeout: Longword; aUseBufferedSockets: Boolean; const aTransportCtl : ITransportControl);
{$ENDIF}
begin
inherited Create;
+ if aTransportCtl <> nil
+ then FTransportControl := aTransportCtl
+ else FTransportControl := TTransportControlImpl.Create;
+ ASSERT( FTransportControl <> nil);
+
{$IFDEF OLD_SOCKETS}
FPort := aPort;
FClientTimeout := aClientTimeout;
@@ -657,7 +772,7 @@
Exit;
end;
- trans := TSocketImpl.Create( client, TRUE, FClientTimeout);
+ trans := TSocketImpl.Create( client, TRUE, FClientTimeout, TransportControl);
client := nil; // trans owns it now
if FUseBufferedSocket
@@ -676,7 +791,7 @@
client := FServer.Accept;
try
- trans := TSocketImpl.Create(client, MaxMessageSize, True);
+ trans := TSocketImpl.Create(client, True, TransportControl);
client := nil;
if FUseBufferedSocket then
@@ -725,9 +840,9 @@
{ TSocket }
{$IFDEF OLD_SOCKETS}
-constructor TSocketImpl.Create( const aClient : TCustomIpClient; const aOwnsClient : Boolean; const aTimeout: Integer);
+constructor TSocketImpl.Create( const aClient : TCustomIpClient; const aOwnsClient : Boolean; const aTimeout: Integer; const aTransportCtl : ITransportControl);
{$ELSE}
-constructor TSocketImpl.Create(const aClient: TSocket; const aOwnsClient: Boolean);
+constructor TSocketImpl.Create(const aClient: TSocket; const aOwnsClient: Boolean; const aTransportCtl : ITransportControl);
{$ENDIF}
var stream : IThriftStream;
begin
@@ -741,16 +856,16 @@
{$ENDIF}
stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
- inherited Create( stream, stream);
+ inherited Create( stream, stream, aTransportCtl);
end;
{$IFDEF OLD_SOCKETS}
-constructor TSocketImpl.Create(const aHost: string; const aPort, aTimeout: Integer);
+constructor TSocketImpl.Create(const aHost: string; const aPort, aTimeout: Integer; const aTransportCtl : ITransportControl);
{$ELSE}
-constructor TSocketImpl.Create(const aHost: string; const aPort : Integer; const aTimeout: Longword);
+constructor TSocketImpl.Create(const aHost: string; const aPort : Integer; const aTimeout: Longword; const aTransportCtl : ITransportControl);
{$ENDIF}
begin
- inherited Create(nil,nil);
+ inherited Create(nil,nil, aTransportCtl);
FHost := aHost;
FPort := aPort;
FTimeout := aTimeout;
@@ -928,6 +1043,22 @@
end;
+procedure TBufferedStreamImpl.CheckReadBytesAvailable( const value : Integer);
+var nRequired : Integer;
+begin
+ nRequired := value;
+
+ if FReadBuffer <> nil then begin
+ Dec( nRequired, (FReadBuffer.Position - FReadBuffer.Size));
+ if nRequired <= 0 then Exit;
+ end;
+
+ if FStream <> nil
+ then FStream.CheckReadBytesAvailable( nRequired)
+ else raise TTransportExceptionEndOfFile.Create('Not enough input data');
+end;
+
+
function TBufferedStreamImpl.ToArray: TBytes;
var len : Integer;
begin
@@ -963,9 +1094,9 @@
{ TStreamTransportImpl }
-constructor TStreamTransportImpl.Create( const aInputStream, aOutputStream : IThriftStream);
+constructor TStreamTransportImpl.Create( const aInputStream, aOutputStream : IThriftStream; const aTransportCtl : ITransportControl);
begin
- inherited Create;
+ inherited Create( aTransportCtl);
FInputStream := aInputStream;
FOutputStream := aOutputStream;
end;
@@ -1018,6 +1149,7 @@
then raise TTransportExceptionNotOpen.Create('Cannot read from null inputstream' );
Result := FInputStream.Read( pBuf,buflen, off, len );
+ ConsumeReadBytes( result);
end;
procedure TStreamTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
@@ -1028,13 +1160,20 @@
FOutputStream.Write( pBuf, off, len );
end;
+procedure TStreamTransportImpl.CheckReadBytesAvailable( const value : Integer);
+begin
+ if FInputStream <> nil
+ then FInputStream.CheckReadBytesAvailable( value)
+ else raise TTransportExceptionNotOpen.Create('Cannot read from null inputstream' );
+end;
+
{ TBufferedTransportImpl }
constructor TBufferedTransportImpl.Create( const aTransport : IStreamTransport; const aBufSize: Integer);
begin
ASSERT( aTransport <> nil);
- inherited Create;
+ inherited Create( aTransport.TransportControl);
FTransport := aTransport;
FBufSize := aBufSize;
InitBuffers;
@@ -1094,6 +1233,23 @@
end;
end;
+procedure TBufferedTransportImpl.CheckReadBytesAvailable( const value : Integer);
+var stm2 : IThriftStream2;
+ need : Integer;
+begin
+ need := value;
+
+ // buffered bytes
+ if Supports( FInputBuffer, IThriftStream2, stm2) then begin
+ Dec( need, stm2.Size - stm2.Position);
+ if need <= 0 then Exit;
+ end;
+
+ if FInputBuffer <> nil
+ then FInputBuffer.CheckReadBytesAvailable( need)
+ else raise TTransportExceptionNotOpen.Create('Cannot read from null inputstream' );
+end;
+
{ TBufferedTransportImpl.TFactory }
function TBufferedTransportImpl.TFactory.GetTransport( const aTransport: ITransport): ITransport;
@@ -1104,10 +1260,18 @@
{ TFramedTransportImpl }
+constructor TFramedTransportImpl.Create( const aTransportCtl : ITransportControl);
+begin
+ inherited Create( aTransportCtl);
+
+ InitMaxFrameSize;
+ InitWriteBuffer;
+end;
+
constructor TFramedTransportImpl.Create( const aTransport: ITransport);
begin
ASSERT( aTransport <> nil);
- inherited Create;
+ inherited Create( aTransport.TransportControl);
InitMaxFrameSize;
InitWriteBuffer;
@@ -1122,8 +1286,15 @@
end;
procedure TFramedTransportImpl.InitMaxFrameSize;
+var maxLen : Integer;
begin
FMaxFrameSize := DEFAULT_MAX_LENGTH;
+
+ // MaxAllowedMessageSize may be smaller, but not larger
+ if TransportControl <> nil then begin
+ maxLen := TransportControl.MaxAllowedMessageSize - SizeOf(TFramedHeader);
+ FMaxFrameSize := Min( FMaxFrameSize, maxLen);
+ end;
end;
procedure TFramedTransportImpl.Close;
@@ -1225,6 +1396,7 @@
raise TTransportExceptionCorruptedData.Create('Frame size ('+IntToStr(size)+') larger than allowed maximum ('+IntToStr(FMaxFrameSize)+')');
end;
+ FTransport.CheckReadBytesAvailable( size);
SetLength( buff, size );
FTransport.ReadAll( buff, 0, size );
@@ -1247,6 +1419,18 @@
end;
+procedure TFramedTransportImpl.CheckReadBytesAvailable( const value : Integer);
+var nRemaining : Int64;
+begin
+ if FReadBuffer = nil
+ then raise TTransportExceptionEndOfFile.Create('Cannot read from null inputstream');
+
+ nRemaining := FReadBuffer.Size - FReadBuffer.Position;
+ if value > nRemaining
+ then raise TTransportExceptionEndOfFile.Create('Not enough input data');
+end;
+
+
{ TFramedTransport.TFactory }
function TFramedTransportImpl.TFactory.GetTransport( const aTransport: ITransport): ITransport;
@@ -1577,6 +1761,10 @@
{$ENDIF}
+procedure TTcpSocketStreamImpl.CheckReadBytesAvailable( const value : Integer);
+begin
+ // we can't really tell, no further checks possible
+end;
end.
diff --git a/lib/delphi/src/Thrift.WinHTTP.pas b/lib/delphi/src/Thrift.WinHTTP.pas
index 854d7c0..dc29bec 100644
--- a/lib/delphi/src/Thrift.WinHTTP.pas
+++ b/lib/delphi/src/Thrift.WinHTTP.pas
@@ -486,7 +486,7 @@
IWinHTTPConnection = interface;
IWinHTTPRequest = interface
- ['{F65952F2-2F3B-47DC-B524-F1694E6D2AD7}']
+ ['{7862DC7C-3128-4AA1-B9B0-0EB0FE8B15B9}']
function Handle : HINTERNET;
function Connection : IWinHTTPConnection;
function AddRequestHeader( const aHeader : string; const addflag : DWORD = WINHTTP_ADDREQ_FLAG_ADD) : Boolean;
@@ -498,6 +498,7 @@
function FlushAndReceiveResponse : Boolean;
function ReadData( const dwRead : DWORD) : TBytes; overload;
function ReadData( const pBuf : Pointer; const dwRead : DWORD) : DWORD; overload;
+ function QueryDataAvailable : DWORD;
end;
IWinHTTPConnection = interface
@@ -616,6 +617,7 @@
function FlushAndReceiveResponse : Boolean;
function ReadData( const dwRead : DWORD) : TBytes; overload;
function ReadData( const pBuf : Pointer; const dwRead : DWORD) : DWORD; overload;
+ function QueryDataAvailable : DWORD;
public
constructor Create( const aConnection : IWinHTTPConnection;
@@ -1111,6 +1113,14 @@
end;
+function TWinHTTPRequestImpl.QueryDataAvailable : DWORD;
+begin
+ if not WinHttpQueryDataAvailable( FHandle, result)
+ then result := 0;
+end;
+
+
+
{ TWinHTTPUrlImpl }
constructor TWinHTTPUrlImpl.Create(const aUri: UnicodeString);
diff --git a/lib/delphi/test/TestClient.pas b/lib/delphi/test/TestClient.pas
index a488cac..3562dab 100644
--- a/lib/delphi/test/TestClient.pas
+++ b/lib/delphi/test/TestClient.pas
@@ -1396,7 +1396,7 @@
case FSetup.endpoint of
trns_Sockets: begin
Console.WriteLine('Using sockets ('+FSetup.host+' port '+IntToStr(FSetup.port)+')');
- streamtrans := TSocketImpl.Create( FSetup.host, FSetup.port );
+ streamtrans := TSocketImpl.Create( FSetup.host, FSetup.port, DEFAULT_THRIFT_TIMEOUT);
FTransport := streamtrans;
end;
diff --git a/lib/delphi/test/TestServer.pas b/lib/delphi/test/TestServer.pas
index 2a80d52..a9c71fb 100644
--- a/lib/delphi/test/TestServer.pas
+++ b/lib/delphi/test/TestServer.pas
@@ -585,7 +585,7 @@
trns_Sockets : begin
Console.WriteLine('- sockets (port '+IntToStr(port)+')');
if (trns_Buffered in layered) then Console.WriteLine('- buffered');
- servertrans := TServerSocketImpl.Create( Port, 0, (trns_Buffered in layered));
+ servertrans := TServerSocketImpl.Create( Port, DEFAULT_THRIFT_TIMEOUT, (trns_Buffered in layered));
end;
trns_MsxmlHttp,
diff --git a/lib/delphi/test/serializer/TestSerializer.Tests.pas b/lib/delphi/test/serializer/TestSerializer.Tests.pas
index ec8d86d..fe69f4e 100644
--- a/lib/delphi/test/serializer/TestSerializer.Tests.pas
+++ b/lib/delphi/test/serializer/TestSerializer.Tests.pas
@@ -69,6 +69,9 @@
class procedure Deserialize( const input : TBytes; const target : IBase; const factory : TFactoryPair); overload;
class procedure Deserialize( const input : TStream; const target : IBase; const factory : TFactoryPair); overload;
+ class procedure ValidateReadToEnd( const input : TBytes; const serial : TDeserializer); overload;
+ class procedure ValidateReadToEnd( const input : TStream; const serial : TDeserializer); overload;
+
procedure Test_Serializer_Deserializer;
procedure Test_OneOfEach( const method : TMethod; const factory : TFactoryPair; const stream : TFileStream);
procedure Test_CompactStruct( const method : TMethod; const factory : TFactoryPair; const stream : TFileStream);
@@ -305,9 +308,10 @@
class procedure TTestSerializer.Deserialize( const input : TBytes; const target : IBase; const factory : TFactoryPair);
var serial : TDeserializer;
begin
- serial := TDeserializer.Create( factory.prot, factory.trans);
+ serial := TDeserializer.Create( factory.prot, factory.trans, Length(input));
try
serial.Deserialize( input, target);
+ ValidateReadToEnd( input, serial);
finally
serial.Free;
end;
@@ -317,13 +321,44 @@
class procedure TTestSerializer.Deserialize( const input : TStream; const target : IBase; const factory : TFactoryPair);
var serial : TDeserializer;
begin
- serial := TDeserializer.Create( factory.prot, factory.trans);
+ serial := TDeserializer.Create( factory.prot, factory.trans, input.Size);
try
serial.Deserialize( input, target);
+ ValidateReadToEnd( input, serial);
finally
serial.Free;
end;
end;
+class procedure TTestSerializer.ValidateReadToEnd( const input : TBytes; const serial : TDeserializer);
+// we should not have any more byte to read
+var dummy : IBase;
+begin
+ try
+ dummy := TOneOfEachImpl.Create;
+ serial.Deserialize( input, dummy);
+ raise EInOutError.Create('Expected exception not thrown?');
+ except
+ on e:TTransportExceptionEndOfFile do {expected};
+ on e:Exception do raise; // unexpected
+ end;
+end;
+
+
+class procedure TTestSerializer.ValidateReadToEnd( const input : TStream; const serial : TDeserializer);
+// we should not have any more byte to read
+var dummy : IBase;
+begin
+ try
+ input.Position := 0;
+ dummy := TOneOfEachImpl.Create;
+ serial.Deserialize( input, dummy);
+ raise EInOutError.Create('Expected exception not thrown?');
+ except
+ on e:TTransportExceptionEndOfFile do {expected};
+ on e:Exception do raise; // unexpected
+ end;
+end;
+
end.