THRIFT-3733 Socket timeout improvements
Client: Delphi
Patch: Jens Geyer
Socket timeout improvements, plus some code cleanup and preparation for "new" Delphi sockets.
diff --git a/lib/delphi/src/Thrift.Transport.pas b/lib/delphi/src/Thrift.Transport.pas
index 9fd52a8..88de6f4 100644
--- a/lib/delphi/src/Thrift.Transport.pas
+++ b/lib/delphi/src/Thrift.Transport.pas
@@ -160,11 +160,19 @@
end;
{$IFDEF OLD_SOCKETS}
+ TThriftCustomIpClient = TCustomIpClient;
+ TThriftTcpServer = TTcpServer;
+ TThriftTcpClient = TTcpClient;
+ {$ELSE}
+ // TODO
+ {$ENDIF}
+
+ {$IFDEF OLD_SOCKETS}
TTcpSocketStreamImpl = class( TThriftStreamImpl )
private type
TWaitForData = ( wfd_HaveData, wfd_Timeout, wfd_Error);
private
- FTcpClient : TCustomIpClient;
+ FTcpClient : TThriftCustomIpClient;
FTimeout : Integer;
function Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
TimeOut: Integer; var wsaError : Integer): Integer;
@@ -180,7 +188,7 @@
function IsOpen: Boolean; override;
function ToArray: TBytes; override;
public
- constructor Create( const ATcpClient: TCustomIpClient; const aTimeout : Integer = 0);
+ constructor Create( const ATcpClient: TThriftCustomIpClient; const aTimeout : Integer = 0);
end;
{$ENDIF}
@@ -236,7 +244,7 @@
{$IFDEF OLD_SOCKETS}
TServerSocketImpl = class( TServerTransportImpl)
private
- FServer : TTcpServer;
+ FServer : TThriftTcpServer;
FPort : Integer;
FClientTimeout : Integer;
FUseBufferedSocket : Boolean;
@@ -244,7 +252,7 @@
protected
function Accept( const fnAccepting: TProc) : ITransport; override;
public
- constructor Create( const AServer: TTcpServer; AClientTimeout: Integer = 0); overload;
+ constructor Create( const AServer: TThriftTcpServer; AClientTimeout: Integer = 0); overload;
constructor Create( APort: Integer; AClientTimeout: Integer = 0; AUseBufferedSockets: Boolean = FALSE); overload;
destructor Destroy; override;
procedure Listen; override;
@@ -278,7 +286,7 @@
{$IFDEF OLD_SOCKETS}
TSocketImpl = class(TStreamTransportImpl)
private
- FClient : TCustomIpClient;
+ FClient : TThriftCustomIpClient;
FOwnsClient : Boolean;
FHost : string;
FPort : Integer;
@@ -289,11 +297,11 @@
function GetIsOpen: Boolean; override;
public
procedure Open; override;
- constructor Create( const AClient : TCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0); overload;
+ constructor Create( const AClient : TThriftCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0); overload;
constructor Create( const AHost: string; APort: Integer; ATimeout: Integer = 0); overload;
destructor Destroy; override;
procedure Close; override;
- property TcpClient: TCustomIpClient read FClient;
+ property TcpClient: TThriftCustomIpClient read FClient;
property Host : string read FHost;
property Port: Integer read FPort;
end;
@@ -459,19 +467,16 @@
function THTTPClientImpl.Read( var buf: TBytes; off, len: Integer): Integer;
begin
- if FInputStream = nil then
- begin
+ if FInputStream = nil then begin
raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
- 'No request has been sent');
+ 'No request has been sent');
end;
+
try
Result := FInputStream.Read( buf, off, len )
except
- on E: Exception do
- begin
- raise TTransportException.Create( TTransportException.TExceptionType.Unknown,
- E.Message);
- end;
+ on E: Exception
+ do raise TTransportException.Create( TTransportException.TExceptionType.Unknown, E.Message);
end;
end;
@@ -546,7 +551,7 @@
{ TServerSocket }
{$IFDEF OLD_SOCKETS}
-constructor TServerSocketImpl.Create( const AServer: TTcpServer; AClientTimeout: Integer);
+constructor TServerSocketImpl.Create( const AServer: TThriftTcpServer; AClientTimeout: Integer);
begin
inherited Create;
FServer := AServer;
@@ -560,7 +565,7 @@
FClientTimeout := AClientTimeout;
FUseBufferedSocket := AUseBufferedSockets;
FOwnsServer := True;
- FServer := TTcpServer.Create( nil );
+ FServer := TThriftTcpServer.Create( nil );
FServer.BlockMode := bmBlocking;
{$IF CompilerVersion >= 21.0}
FServer.LocalPort := AnsiString( IntToStr( FPort));
@@ -580,31 +585,28 @@
function TServerSocketImpl.Accept( const fnAccepting: TProc): ITransport;
var
- client : TCustomIpClient;
+ client : TThriftCustomIpClient;
trans : IStreamTransport;
begin
- if FServer = nil then
- begin
+ if FServer = nil then begin
raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
- 'No underlying server socket.');
+ 'No underlying server socket.');
end;
client := nil;
try
- client := TCustomIpClient.Create(nil);
+ client := TThriftCustomIpClient.Create(nil);
if Assigned(fnAccepting)
then fnAccepting();
- if not FServer.Accept( client) then
- begin
+ if not FServer.Accept( client) then begin
client.Free;
Result := nil;
Exit;
end;
- if client = nil then
- begin
+ if client = nil then begin
Result := nil;
Exit;
end;
@@ -641,22 +643,18 @@
procedure TServerSocketImpl.Close;
begin
- if FServer <> nil then
- begin
- try
- FServer.Active := False;
- except
- on E: Exception do
- begin
- raise TTransportException.Create('Error on closing socket : ' + E.Message);
- end;
- end;
+ if FServer <> nil
+ then try
+ FServer.Active := False;
+ except
+ on E: Exception
+ do raise TTransportException.Create('Error on closing socket : ' + E.Message);
end;
end;
{ TSocket }
-constructor TSocketImpl.Create( const AClient : TCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0);
+constructor TSocketImpl.Create( const AClient : TThriftCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0);
var stream : IThriftStream;
begin
FClient := AClient;
@@ -702,7 +700,7 @@
then FreeAndNil( FClient)
else FClient := nil;
- FClient := TTcpClient.Create( nil);
+ FClient := TThriftTcpClient.Create( nil);
FOwnsClient := True;
stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
@@ -712,28 +710,23 @@
procedure TSocketImpl.Open;
begin
- if IsOpen then
- begin
+ if IsOpen then begin
raise TTransportException.Create( TTransportException.TExceptionType.AlreadyOpen,
- 'Socket already connected');
+ 'Socket already connected');
end;
- if FHost = '' then
- begin
+ if FHost = '' then begin
raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
- 'Cannot open null host');
+ 'Cannot open null host');
end;
- if Port <= 0 then
- begin
+ if Port <= 0 then begin
raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
- 'Cannot open without port');
+ 'Cannot open without port');
end;
- if FClient = nil then
- begin
- InitSocket;
- end;
+ if FClient = nil
+ then InitSocket;
FClient.RemoteHost := TSocketHost( Host);
FClient.RemotePort := TSocketPort( IntToStr( Port));
@@ -877,21 +870,8 @@
procedure TStreamTransportImpl.Close;
begin
- if FInputStream <> FOutputStream then
- begin
- if FInputStream <> nil then
- begin
- FInputStream := nil;
- end;
- if FOutputStream <> nil then
- begin
- FOutputStream := nil;
- end;
- end else
- begin
- FInputStream := nil;
- FOutputStream := nil;
- end;
+ FInputStream := nil;
+ FOutputStream := nil;
end;
constructor TStreamTransportImpl.Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);
@@ -910,9 +890,9 @@
procedure TStreamTransportImpl.Flush;
begin
- if FOutputStream = nil then
- begin
- raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'Cannot flush null outputstream' );
+ if FOutputStream = nil then begin
+ raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
+ 'Cannot flush null outputstream' );
end;
FOutputStream.Flush;
@@ -940,18 +920,19 @@
function TStreamTransportImpl.Read(var buf: TBytes; off, len: Integer): Integer;
begin
- if FInputStream = nil then
- begin
- raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'Cannot read from null inputstream' );
+ if FInputStream = nil then begin
+ raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
+ 'Cannot read from null inputstream' );
end;
+
Result := FInputStream.Read( buf, off, len );
end;
procedure TStreamTransportImpl.Write(const buf: TBytes; off, len: Integer);
begin
- if FOutputStream = nil then
- begin
- raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'Cannot write to null outputstream' );
+ if FOutputStream = nil then begin
+ raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
+ 'Cannot write to null outputstream' );
end;
FOutputStream.Write( buf, off, len );
@@ -1190,7 +1171,7 @@
FTcpClient.Close;
end;
-constructor TTcpSocketStreamImpl.Create( const ATcpClient: TCustomIpClient; const aTimeout : Integer);
+constructor TTcpSocketStreamImpl.Create( const ATcpClient: TThriftCustomIpClient; const aTimeout : Integer);
begin
inherited Create;
FTcpClient := ATcpClient;
@@ -1325,33 +1306,43 @@
var wfd : TWaitForData;
wsaError, nBytes : Integer;
pDest : PByte;
-const
- SLEEP_TIME = 200;
+ msecs : Integer;
begin
inherited;
+ if FTimeout > 0
+ then msecs := FTimeout
+ else msecs := DEFAULT_THRIFT_TIMEOUT;
+
result := 0;
pDest := Pointer(@buffer[offset]);
while count > 0 do begin
while TRUE do begin
- if FTimeout > 0
- then wfd := WaitForData( FTimeout, pDest, count, wsaError, nBytes)
- else wfd := WaitForData( SLEEP_TIME, pDest, count, wsaError, nBytes);
-
+ wfd := WaitForData( msecs, pDest, count, wsaError, nBytes);
case wfd of
TWaitForData.wfd_Error : Exit;
TWaitForData.wfd_HaveData : Break;
TWaitForData.wfd_Timeout : begin
- if (FTimeout > 0)
- then raise TTransportException.Create( TTransportException.TExceptionType.TimedOut,
- SysErrorMessage(Cardinal(wsaError)));
+ if (FTimeout = 0)
+ then Exit
+ else begin
+ raise TTransportException.Create( TTransportException.TExceptionType.TimedOut,
+ SysErrorMessage(Cardinal(wsaError)));
+
+ end;
end;
else
ASSERT( FALSE);
end;
end;
+ // reduce the timeout once we got data
+ if FTimeout > 0
+ then msecs := FTimeout div 10
+ else msecs := DEFAULT_THRIFT_TIMEOUT div 10;
+ msecs := Max( msecs, 200);
+
ASSERT( nBytes <= count);
nBytes := FTcpClient.ReceiveBuf( pDest^, nBytes);
Inc( pDest, nBytes);