THRIFT-5009 Serializer implemtation lacks support for layered transports
Client: Delphi
Patch: Jens Geyer
diff --git a/lib/delphi/src/Thrift.Transport.pas b/lib/delphi/src/Thrift.Transport.pas
index 3067bcd..bede57c 100644
--- a/lib/delphi/src/Thrift.Transport.pas
+++ b/lib/delphi/src/Thrift.Transport.pas
@@ -200,7 +200,7 @@
end;
TTransportFactoryImpl = class( TInterfacedObject, ITransportFactory)
- function GetTransport( const ATrans: ITransport): ITransport; virtual;
+ function GetTransport( const aTransport: ITransport): ITransport; virtual;
end;
TTcpSocketStreamImpl = class( TThriftStreamImpl )
@@ -253,17 +253,19 @@
function GetInputStream: IThriftStream;
function GetOutputStream: IThriftStream;
- public
- property InputStream : IThriftStream read GetInputStream;
- property OutputStream : IThriftStream read GetOutputStream;
+ protected
procedure Open; override;
procedure Close; override;
procedure Flush; override;
function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
procedure Write( const pBuf : Pointer; off, len : Integer); override;
- constructor Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);
+ public
+ constructor Create( const aInputStream, aOutputStream : IThriftStream);
destructor Destroy; override;
+
+ property InputStream : IThriftStream read GetInputStream;
+ property OutputStream : IThriftStream read GetOutputStream;
end;
TBufferedStreamImpl = class( TThriftStreamImpl)
@@ -281,7 +283,7 @@
function IsOpen: Boolean; override;
function ToArray: TBytes; override;
public
- constructor Create( const AStream: IThriftStream; ABufSize: Integer);
+ constructor Create( const aStream: IThriftStream; const aBufSize : Integer);
destructor Destroy; override;
end;
@@ -300,11 +302,11 @@
function Accept( const fnAccepting: TProc) : ITransport; override;
public
{$IFDEF OLD_SOCKETS}
- constructor Create( const AServer: TTcpServer; AClientTimeout: Integer = 0); overload;
- constructor Create( APort: Integer; AClientTimeout: Integer = 0; AUseBufferedSockets: Boolean = FALSE); overload;
+ constructor Create( const aServer: TTcpServer; const aClientTimeout: Integer = 0); overload;
+ constructor Create( const aPort: Integer; const aClientTimeout: Integer = 0; const aUseBufferedSockets: Boolean = FALSE); overload;
{$ELSE}
- constructor Create( const AServer: TServerSocket; AClientTimeout: Longword = 0); overload;
- constructor Create( APort: Integer; AClientTimeout: Longword = 0; AUseBufferedSockets: Boolean = FALSE); overload;
+ constructor Create( const aServer: TServerSocket; const aClientTimeout: Longword = 0); overload;
+ constructor Create( const aPort: Integer; const aClientTimeout: Longword = 0; const aUseBufferedSockets: Boolean = FALSE); overload;
{$ENDIF}
destructor Destroy; override;
procedure Listen; override;
@@ -324,12 +326,17 @@
function GetIsOpen: Boolean; override;
procedure Flush; override;
public
+ type
+ TFactory = class( TTransportFactoryImpl )
+ public
+ function GetTransport( const aTransport: ITransport): ITransport; override;
+ end;
+
+ constructor Create( const aTransport : IStreamTransport; const aBufSize: Integer = 1024);
procedure Open(); override;
procedure Close(); override;
function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
procedure Write( const pBuf : Pointer; off, len : Integer); override;
- constructor Create( const ATransport : IStreamTransport ); overload;
- constructor Create( const ATransport : IStreamTransport; ABufSize: Integer); overload;
property UnderlyingTransport: ITransport read GetUnderlyingTransport;
property IsOpen: Boolean read GetIsOpen;
end;
@@ -356,11 +363,11 @@
public
procedure Open; override;
{$IFDEF OLD_SOCKETS}
- constructor Create( const AClient : TCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0); overload;
- constructor Create( const AHost: string; APort: Integer; ATimeout: Integer = 0); overload;
+ constructor Create( const aClient : TCustomIpClient; const aOwnsClient : Boolean; const aTimeout: Integer = 0); overload;
+ constructor Create( const aHost: string; const aPort: Integer; const aTimeout: Integer = 0); overload;
{$ELSE}
- constructor Create(const AClient: TSocket; aOwnsClient: Boolean); overload;
- constructor Create( const AHost: string; APort: Integer; ATimeout: Longword = 0); overload;
+ constructor Create( const aClient: TSocket; const aOwnsClient: Boolean); overload;
+ constructor Create( const aHost: string; const aPort: Integer; const aTimeout: Longword = 0); overload;
{$ENDIF}
destructor Destroy; override;
procedure Close; override;
@@ -387,16 +394,6 @@
procedure InitMaxFrameSize;
procedure InitWriteBuffer;
procedure ReadFrame;
- public
- type
- TFactory = class( TTransportFactoryImpl )
- public
- function GetTransport( const ATrans: ITransport): ITransport; override;
- end;
-
- constructor Create; overload;
- constructor Create( const ATrans: ITransport); overload;
- destructor Destroy; override;
procedure Open(); override;
function GetIsOpen: Boolean; override;
@@ -405,6 +402,15 @@
function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
procedure Write( const pBuf : Pointer; off, len : Integer); override;
procedure Flush; override;
+ public
+ type
+ TFactory = class( TTransportFactoryImpl )
+ public
+ function GetTransport( const aTransport: ITransport): ITransport; override;
+ end;
+
+ constructor Create( const aTransport: ITransport); overload;
+ destructor Destroy; override;
end;
@@ -412,10 +418,9 @@
DEFAULT_THRIFT_TIMEOUT = 5 * 1000; // ms
DEFAULT_THRIFT_SECUREPROTOCOLS = [ TSecureProtocol.TLS_1_1, TSecureProtocol.TLS_1_2];
-
-
implementation
+
{ TTransportImpl }
procedure TTransportImpl.Flush;
@@ -442,18 +447,6 @@
else result := 0;
end;
-procedure TTransportImpl.Write( const buf: TBytes);
-begin
- if Length(buf) > 0
- then Write( @buf[0], 0, Length(buf));
-end;
-
-procedure TTransportImpl.Write( const buf: TBytes; off: Integer; len: Integer);
-begin
- if Length(buf) > 0
- then Write( @buf[0], off, len);
-end;
-
function TTransportImpl.ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
var ret : Integer;
begin
@@ -466,11 +459,24 @@
end;
end;
+procedure TTransportImpl.Write( const buf: TBytes);
+begin
+ if Length(buf) > 0
+ then Write( @buf[0], 0, Length(buf));
+end;
+
+procedure TTransportImpl.Write( const buf: TBytes; off: Integer; len: Integer);
+begin
+ if Length(buf) > 0
+ then Write( @buf[0], off, len);
+end;
+
procedure TTransportImpl.Write( const pBuf : Pointer; len : Integer);
begin
Self.Write( pBuf, 0, len);
end;
+
{ TTransportException }
constructor TTransportException.HiddenCreate(const Msg: string);
@@ -478,17 +484,17 @@
inherited Create(Msg);
end;
-class function TTransportException.Create(AType: TExceptionType): TTransportException;
+class function TTransportException.Create(aType: TExceptionType): TTransportException;
begin
//no inherited;
{$WARN SYMBOL_DEPRECATED OFF}
- Result := Create(AType, '')
+ Result := Create(aType, '')
{$WARN SYMBOL_DEPRECATED DEFAULT}
end;
class function TTransportException.Create(aType: TExceptionType; const msg: string): TTransportException;
begin
- case AType of
+ case aType of
TExceptionType.NotOpen: Result := TTransportExceptionNotOpen.Create(msg);
TExceptionType.AlreadyOpen: Result := TTransportExceptionAlreadyOpen.Create(msg);
TExceptionType.TimedOut: Result := TTransportExceptionTimedOut.Create(msg);
@@ -557,40 +563,44 @@
{ TTransportFactoryImpl }
-function TTransportFactoryImpl.GetTransport( const ATrans: ITransport): ITransport;
+function TTransportFactoryImpl.GetTransport( const aTransport: ITransport): ITransport;
begin
- Result := ATrans;
+ Result := aTransport;
end;
{ TServerSocket }
{$IFDEF OLD_SOCKETS}
-constructor TServerSocketImpl.Create( const AServer: TTcpServer; AClientTimeout: Integer);
-begin
- inherited Create;
- FServer := AServer;
- FClientTimeout := AClientTimeout;
-end;
+constructor TServerSocketImpl.Create( const aServer: TTcpServer; const aClientTimeout : Integer);
{$ELSE}
-constructor TServerSocketImpl.Create( const AServer: TServerSocket; AClientTimeout: Longword);
+constructor TServerSocketImpl.Create( const aServer: TServerSocket; const aClientTimeout: Longword);
+{$ENDIF}
begin
inherited Create;
- FServer := AServer;
- FServer.RecvTimeout := AClientTimeout;
- FServer.SendTimeout := AClientTimeout;
-end;
-{$ENDIF}
+ FServer := aServer;
{$IFDEF OLD_SOCKETS}
-constructor TServerSocketImpl.Create(APort, AClientTimeout: Integer; AUseBufferedSockets: Boolean);
+ FClientTimeout := aClientTimeout;
{$ELSE}
-constructor TServerSocketImpl.Create(APort: Integer; AClientTimeout: Longword; AUseBufferedSockets: Boolean);
+ FServer.RecvTimeout := aClientTimeout;
+ FServer.SendTimeout := aClientTimeout;
+{$ENDIF}
+end;
+
+
+{$IFDEF OLD_SOCKETS}
+constructor TServerSocketImpl.Create( const aPort: Integer; const aClientTimeout: Integer; const aUseBufferedSockets: Boolean);
+{$ELSE}
+constructor TServerSocketImpl.Create( const aPort: Integer; const aClientTimeout: Longword; const aUseBufferedSockets: Boolean);
{$ENDIF}
begin
inherited Create;
+
{$IFDEF OLD_SOCKETS}
- FPort := APort;
- FClientTimeout := AClientTimeout;
+ FPort := aPort;
+ FClientTimeout := aClientTimeout;
+
+ FOwnsServer := True;
FServer := TTcpServer.Create( nil );
FServer.BlockMode := bmBlocking;
{$IF CompilerVersion >= 21.0}
@@ -599,10 +609,11 @@
FServer.LocalPort := IntToStr( FPort);
{$IFEND}
{$ELSE}
- FServer := TServerSocket.Create(APort, AClientTimeout, AClientTimeout);
-{$ENDIF}
- FUseBufferedSocket := AUseBufferedSockets;
FOwnsServer := True;
+ FServer := TServerSocket.Create(aPort, aClientTimeout, aClientTimeout);
+{$ENDIF}
+
+ FUseBufferedSocket := aUseBufferedSockets;
end;
destructor TServerSocketImpl.Destroy;
@@ -665,7 +676,7 @@
client := FServer.Accept;
try
- trans := TSocketImpl.Create(client, True);
+ trans := TSocketImpl.Create(client, MaxMessageSize, True);
client := nil;
if FUseBufferedSocket then
@@ -714,37 +725,35 @@
{ TSocket }
{$IFDEF OLD_SOCKETS}
-constructor TSocketImpl.Create( const AClient : TCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0);
+constructor TSocketImpl.Create( const aClient : TCustomIpClient; const aOwnsClient : Boolean; const aTimeout: Integer);
+{$ELSE}
+constructor TSocketImpl.Create(const aClient: TSocket; const aOwnsClient: Boolean);
+{$ENDIF}
var stream : IThriftStream;
begin
- FClient := AClient;
- FTimeout := ATimeout;
+ FClient := aClient;
FOwnsClient := aOwnsClient;
+
+{$IFDEF OLD_SOCKETS}
+ FTimeout := aTimeout;
+{$ELSE}
+ FTimeout := aClient.RecvTimeout;
+{$ENDIF}
+
stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
inherited Create( stream, stream);
end;
-{$ELSE}
-constructor TSocketImpl.Create(const AClient: TSocket; aOwnsClient: Boolean);
-var stream : IThriftStream;
-begin
- FClient := AClient;
- FTimeout := AClient.RecvTimeout;
- FOwnsClient := aOwnsClient;
- stream := TTcpSocketStreamImpl.Create(FClient, FTimeout);
- inherited Create(stream, stream);
-end;
-{$ENDIF}
{$IFDEF OLD_SOCKETS}
-constructor TSocketImpl.Create(const AHost: string; APort, ATimeout: Integer);
+constructor TSocketImpl.Create(const aHost: string; const aPort, aTimeout: Integer);
{$ELSE}
-constructor TSocketImpl.Create(const AHost: string; APort: Integer; ATimeout: Longword);
+constructor TSocketImpl.Create(const aHost: string; const aPort : Integer; const aTimeout: Longword);
{$ENDIF}
begin
inherited Create(nil,nil);
- FHost := AHost;
- FPort := APort;
- FTimeout := ATimeout;
+ FHost := aHost;
+ FPort := aPort;
+ FTimeout := aTimeout;
InitSocket;
end;
@@ -839,11 +848,11 @@
FWriteBuffer := nil;
end;
-constructor TBufferedStreamImpl.Create( const AStream: IThriftStream; ABufSize: Integer);
+constructor TBufferedStreamImpl.Create( const aStream: IThriftStream; const aBufSize : Integer);
begin
inherited Create;
- FStream := AStream;
- FBufSize := ABufSize;
+ FStream := aStream;
+ FBufSize := aBufSize;
FReadBuffer := TMemoryStream.Create;
FWriteBuffer := TMemoryStream.Create;
end;
@@ -918,6 +927,7 @@
end;
end;
+
function TBufferedStreamImpl.ToArray: TBytes;
var len : Integer;
begin
@@ -953,11 +963,11 @@
{ TStreamTransportImpl }
-constructor TStreamTransportImpl.Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);
+constructor TStreamTransportImpl.Create( const aInputStream, aOutputStream : IThriftStream);
begin
inherited Create;
- FInputStream := AInputStream;
- FOutputStream := AOutputStream;
+ FInputStream := aInputStream;
+ FOutputStream := aOutputStream;
end;
destructor TStreamTransportImpl.Destroy;
@@ -1004,35 +1014,29 @@
function TStreamTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
begin
- if FInputStream = nil then begin
- raise TTransportExceptionNotOpen.Create('Cannot read from null inputstream' );
- end;
+ if FInputStream = nil
+ then raise TTransportExceptionNotOpen.Create('Cannot read from null inputstream' );
Result := FInputStream.Read( pBuf,buflen, off, len );
end;
procedure TStreamTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
begin
- if FOutputStream = nil then begin
- raise TTransportExceptionNotOpen.Create('Cannot write to null outputstream' );
- end;
+ if FOutputStream = nil
+ then raise TTransportExceptionNotOpen.Create('Cannot write to null outputstream' );
FOutputStream.Write( pBuf, off, len );
end;
+
{ TBufferedTransportImpl }
-constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport);
+constructor TBufferedTransportImpl.Create( const aTransport : IStreamTransport; const aBufSize: Integer);
begin
- //no inherited;
- Create( ATransport, 1024 );
-end;
-
-constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport; ABufSize: Integer);
-begin
+ ASSERT( aTransport <> nil);
inherited Create;
- FTransport := ATransport;
- FBufSize := ABufSize;
+ FTransport := aTransport;
+ FBufSize := aBufSize;
InitBuffers;
end;
@@ -1040,7 +1044,7 @@
begin
FTransport.Close;
FInputBuffer := nil;
- FOutputBuffer := nil;
+ FOutputBuffer := nil;
end;
procedure TBufferedTransportImpl.Flush;
@@ -1078,10 +1082,9 @@
function TBufferedTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
begin
- Result := 0;
- if FInputBuffer <> nil then begin
- Result := FInputBuffer.Read( pBuf,buflen, off, len );
- end;
+ if FInputBuffer <> nil
+ then Result := FInputBuffer.Read( pBuf,buflen, off, len)
+ else Result := 0;
end;
procedure TBufferedTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
@@ -1091,23 +1094,24 @@
end;
end;
-{ TFramedTransportImpl }
+{ TBufferedTransportImpl.TFactory }
-constructor TFramedTransportImpl.Create;
+function TBufferedTransportImpl.TFactory.GetTransport( const aTransport: ITransport): ITransport;
begin
- inherited Create;
-
- InitMaxFrameSize;
- InitWriteBuffer;
+ Result := TFramedTransportImpl.Create( aTransport);
end;
-constructor TFramedTransportImpl.Create( const ATrans: ITransport);
+
+{ TFramedTransportImpl }
+
+constructor TFramedTransportImpl.Create( const aTransport: ITransport);
begin
+ ASSERT( aTransport <> nil);
inherited Create;
InitMaxFrameSize;
InitWriteBuffer;
- FTransport := ATrans;
+ FTransport := aTransport;
end;
destructor TFramedTransportImpl.Destroy;
@@ -1189,9 +1193,7 @@
if (FReadBuffer <> nil) and (len > 0) then begin
result := FReadBuffer.Read( pTmp^, len);
- if result > 0 then begin
- Exit;
- end;
+ if result > 0 then Exit;
end;
ReadFrame;
@@ -1225,7 +1227,8 @@
SetLength( buff, size );
FTransport.ReadAll( buff, 0, size );
- FReadBuffer.Free;
+
+ FreeAndNil( FReadBuffer);
FReadBuffer := TMemoryStream.Create;
if Length(buff) > 0
then FReadBuffer.Write( Pointer(@buff[0])^, size );
@@ -1243,11 +1246,12 @@
end;
end;
+
{ TFramedTransport.TFactory }
-function TFramedTransportImpl.TFactory.GetTransport( const ATrans: ITransport): ITransport;
+function TFramedTransportImpl.TFactory.GetTransport( const aTransport: ITransport): ITransport;
begin
- Result := TFramedTransportImpl.Create( ATrans );
+ Result := TFramedTransportImpl.Create( aTransport);
end;
{ TTcpSocketStreamImpl }
@@ -1258,17 +1262,17 @@
end;
{$IFDEF OLD_SOCKETS}
-constructor TTcpSocketStreamImpl.Create( const ATcpClient: TCustomIpClient; const aTimeout : Integer);
+constructor TTcpSocketStreamImpl.Create( const aTcpClient: TCustomIpClient; const aTimeout : Integer);
begin
inherited Create;
- FTcpClient := ATcpClient;
+ FTcpClient := aTcpClient;
FTimeout := aTimeout;
end;
{$ELSE}
-constructor TTcpSocketStreamImpl.Create( const ATcpClient: TSocket; const aTimeout : Longword);
+constructor TTcpSocketStreamImpl.Create( const aTcpClient: TSocket; const aTimeout : Longword);
begin
inherited Create;
- FTcpClient := ATcpClient;
+ FTcpClient := aTcpClient;
if aTimeout = 0 then
FTcpClient.RecvTimeout := SLEEP_TIME
else
@@ -1574,12 +1578,5 @@
{$ENDIF}
-{$IF CompilerVersion < 21.0}
-initialization
-begin
- TFramedTransportImpl_Initialize;
-end;
-{$IFEND}
-
end.