THRIFT-2696 Unable to stop socket server while there are idle clients
Client: Delphi
Patch: Jens Geyer & Severian Duchenko
diff --git a/lib/delphi/src/Thrift.Server.pas b/lib/delphi/src/Thrift.Server.pas
index 8237a47..2fb5c90 100644
--- a/lib/delphi/src/Thrift.Server.pas
+++ b/lib/delphi/src/Thrift.Server.pas
@@ -325,6 +325,17 @@
InputProtocol := nil;
OutputProtocol := nil;
+ // close any old connections before before waiting for new clients
+ if client <> nil then try
+ try
+ client.Close;
+ finally
+ client := nil;
+ end;
+ except
+ // catch all, we can't do much about it at this point
+ end;
+
client := FServerTransport.Accept( procedure
begin
if FServerEvents <> nil
diff --git a/lib/delphi/src/Thrift.Transport.Pipes.pas b/lib/delphi/src/Thrift.Transport.Pipes.pas
index d2816c9..eb4e8e3 100644
--- a/lib/delphi/src/Thrift.Transport.Pipes.pas
+++ b/lib/delphi/src/Thrift.Transport.Pipes.pas
@@ -29,7 +29,7 @@
Thrift.Stream;
const
- DEFAULT_THRIFT_PIPE_TIMEOUT = 5 * 1000; // ms
+ DEFAULT_THRIFT_PIPE_TIMEOUT = DEFAULT_THRIFT_TIMEOUT deprecated 'use DEFAULT_THRIFT_TIMEOUT';
@@ -57,7 +57,7 @@
function IsOpen: Boolean; override;
function ToArray: TBytes; override;
public
- constructor Create( aEnableOverlapped : Boolean; const aTimeOut : DWORD = DEFAULT_THRIFT_PIPE_TIMEOUT);
+ constructor Create( aEnableOverlapped : Boolean; const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT);
destructor Destroy; override;
end;
@@ -76,7 +76,7 @@
const aEnableOverlapped : Boolean;
const aShareMode: DWORD = 0;
const aSecurityAttributes: PSecurityAttributes = nil;
- const aTimeOut : DWORD = DEFAULT_THRIFT_PIPE_TIMEOUT); overload;
+ const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT); overload;
end;
@@ -90,7 +90,7 @@
public
constructor Create( const aPipeHandle : THandle;
const aOwnsHandle, aEnableOverlapped : Boolean;
- const aTimeOut : DWORD = DEFAULT_THRIFT_PIPE_TIMEOUT); overload;
+ const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT); overload;
destructor Destroy; override;
end;
@@ -120,7 +120,7 @@
constructor Create( const aPipeName : string;
const aShareMode: DWORD = 0;
const aSecurityAttributes: PSecurityAttributes = nil;
- const aTimeOut : DWORD = DEFAULT_THRIFT_PIPE_TIMEOUT); overload;
+ const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT); overload;
end;
@@ -131,7 +131,7 @@
// ITransport
procedure Close; override;
constructor Create( aPipe : THandle; aOwnsHandle : Boolean;
- const aTimeOut : DWORD = DEFAULT_THRIFT_PIPE_TIMEOUT); reintroduce;
+ const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT); reintroduce;
end;
@@ -260,7 +260,7 @@
constructor TPipeStreamBase.Create( aEnableOverlapped : Boolean;
- const aTimeOut : DWORD = DEFAULT_THRIFT_PIPE_TIMEOUT);
+ const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT);
begin
inherited Create;
ASSERT( aTimeout > 0);
diff --git a/lib/delphi/src/Thrift.Transport.pas b/lib/delphi/src/Thrift.Transport.pas
index 69d74a3..bc66c64 100644
--- a/lib/delphi/src/Thrift.Transport.pas
+++ b/lib/delphi/src/Thrift.Transport.pas
@@ -27,7 +27,7 @@
Classes,
SysUtils,
Math,
- Sockets,
+ Sockets, WinSock,
Generics.Collections,
Thrift.Collections,
Thrift.Utils,
@@ -152,9 +152,15 @@
end;
TTcpSocketStreamImpl = class( TThriftStreamImpl )
+ private type
+ TWaitForData = ( wfd_HaveData, wfd_Timeout, wfd_Error);
private
FTcpClient : TCustomIpClient;
FTimeout : Integer;
+ function Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
+ TimeOut: Integer; var wsaError : Integer): Integer;
+ function WaitForData( TimeOut : Integer; pBuf : Pointer; DesiredBytes: Integer;
+ var wsaError : Integer): TWaitForData;
protected
procedure Write( const buffer: TBytes; offset: Integer; count: Integer); override;
function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer; override;
@@ -270,7 +276,7 @@
function GetIsOpen: Boolean; override;
public
procedure Open; override;
- constructor Create( const AClient : TCustomIpClient; ATimeout: Integer = 0); overload;
+ constructor Create( const AClient : TCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0); overload;
constructor Create( const AHost: string; APort: Integer; ATimeout: Integer = 0); overload;
destructor Destroy; override;
procedure Close; override;
@@ -318,6 +324,10 @@
procedure TFramedTransportImpl_Initialize;
{$IFEND}
+const
+ DEFAULT_THRIFT_TIMEOUT = 5 * 1000; // ms
+
+
implementation
{ TTransportImpl }
@@ -564,13 +574,14 @@
'No underlying server socket.');
end;
+ client := nil;
try
client := TCustomIpClient.Create(nil);
if Assigned(fnAccepting)
then fnAccepting();
- if ( not FServer.Accept( client)) then
+ if not FServer.Accept( client) then
begin
client.Free;
Result := nil;
@@ -583,14 +594,16 @@
Exit;
end;
- trans := TSocketImpl.Create( client, FClientTimeout);
+ trans := TSocketImpl.Create( client, TRUE, FClientTimeout);
+ client := nil; // trans owns it now
+
if FUseBufferedSocket
then result := TBufferedTransportImpl.Create( trans)
else result := trans;
except
- on E: Exception do
- begin
+ on E: Exception do begin
+ client.Free;
raise TTransportException.Create( E.ToString );
end;
end;
@@ -628,11 +641,12 @@
{ TSocket }
-constructor TSocketImpl.Create( const AClient : TCustomIpClient; ATimeout: Integer = 0);
+constructor TSocketImpl.Create( const AClient : TCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0);
var stream : IThriftStream;
begin
FClient := AClient;
FTimeout := ATimeout;
+ FOwnsClient := aOwnsClient;
stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
inherited Create( stream, stream);
end;
@@ -648,37 +662,30 @@
destructor TSocketImpl.Destroy;
begin
- if FOwnsClient then
- begin
- FClient.Free;
- end;
+ if FOwnsClient
+ then FreeAndNil( FClient);
inherited;
end;
procedure TSocketImpl.Close;
begin
inherited Close;
- if FClient <> nil
+ if FOwnsClient
then FreeAndNil( FClient);
end;
function TSocketImpl.GetIsOpen: Boolean;
begin
- Result := False;
- if FClient <> nil then
- begin
- Result := FClient.Connected;
- end;
+ Result := (FClient <> nil) and FClient.Connected;
end;
procedure TSocketImpl.InitSocket;
var
stream : IThriftStream;
begin
- if (FClient <> nil) and FOwnsClient then begin
- FClient.Free;
- FClient := nil;
- end;
+ if FOwnsClient
+ then FreeAndNil( FClient)
+ else FClient := nil;
FClient := TTcpClient.Create( nil);
FOwnsClient := True;
@@ -1188,17 +1195,144 @@
FTcpClient.Open;
end;
-function TTcpSocketStreamImpl.Read(var buffer: TBytes; offset,
- count: Integer): Integer;
+
+function TTcpSocketStreamImpl.Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
+ TimeOut: Integer; var wsaError : Integer): Integer;
+var
+ ReadFds: TFDset;
+ ReadFdsptr: PFDset;
+ WriteFds: TFDset;
+ WriteFdsptr: PFDset;
+ ExceptFds: TFDset;
+ ExceptFdsptr: PFDset;
+ tv: timeval;
+ Timeptr: PTimeval;
+ socket : TSocket;
+begin
+ if not FTcpClient.Active then begin
+ wsaError := WSAEINVAL;
+ Exit( SOCKET_ERROR);
+ end;
+
+ socket := FTcpClient.Handle;
+
+ if Assigned(ReadReady) then
+ begin
+ ReadFdsptr := @ReadFds;
+ FD_ZERO(ReadFds);
+ FD_SET(socket, ReadFds);
+ end
+ else
+ ReadFdsptr := nil;
+
+ if Assigned(WriteReady) then
+ begin
+ WriteFdsptr := @WriteFds;
+ FD_ZERO(WriteFds);
+ FD_SET(socket, WriteFds);
+ end
+ else
+ WriteFdsptr := nil;
+
+ if Assigned(ExceptFlag) then
+ begin
+ ExceptFdsptr := @ExceptFds;
+ FD_ZERO(ExceptFds);
+ FD_SET(socket, ExceptFds);
+ end
+ else
+ ExceptFdsptr := nil;
+
+ if TimeOut >= 0 then
+ begin
+ tv.tv_sec := TimeOut div 1000;
+ tv.tv_usec := 1000 * (TimeOut mod 1000);
+ Timeptr := @tv;
+ end
+ else
+ Timeptr := nil; // wait forever
+
+ wsaError := 0;
+ try
+{$IFDEF MSWINDOWS}
+ result := WinSock.select(socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
+{$ENDIF}
+{$IFDEF LINUX}
+ result := Libc.select(socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
+{$ENDIF}
+ if result = SOCKET_ERROR
+ then wsaError := WSAGetLastError;
+
+ except
+ result := SOCKET_ERROR;
+ end;
+
+ if Assigned(ReadReady) then
+ ReadReady^ := FD_ISSET(socket, ReadFds);
+ if Assigned(WriteReady) then
+ WriteReady^ := FD_ISSET(socket, WriteFds);
+ if Assigned(ExceptFlag) then
+ ExceptFlag^ := FD_ISSET(socket, ExceptFds);
+end;
+
+function TTcpSocketStreamImpl.WaitForData( TimeOut : Integer; pBuf : Pointer;
+ DesiredBytes : Integer;
+ var wsaError : Integer): TWaitForData;
+var bCanRead, bError : Boolean;
+ retval : Integer;
+begin
+ // The select function returns the total number of socket handles that are ready
+ // and contained in the fd_set structures, zero if the time limit expired,
+ // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
+ // WSAGetLastError can be used to retrieve a specific error code.
+ retval := Self.Select( @bCanRead, nil, @bError, TimeOut, wsaError);
+ if retval = SOCKET_ERROR
+ then Exit( TWaitForData.wfd_Error);
+ if (retval = 0) or not bCanRead
+ then Exit( TWaitForData.wfd_Timeout);
+
+ // recv() returns the number of bytes received, or -1 if an error occurred.
+ // The return value will be 0 when the peer has performed an orderly shutdown.
+ retval := recv( FTcpClient.Handle, pBuf^, DesiredBytes, WinSock.MSG_PEEK);
+ if retval <= 0
+ then Exit( TWaitForData.wfd_Error);
+
+ // Enough data ready to be read?
+ if retval = DesiredBytes
+ then result := TWaitForData.wfd_HaveData
+ else result := TWaitForData.wfd_Timeout;
+end;
+
+function TTcpSocketStreamImpl.Read(var buffer: TBytes; offset, count: Integer): Integer;
+var wfd : TWaitForData;
+ wsaError : Integer;
+ pDest : Pointer;
+const
+ SLEEP_TIME = 200;
begin
inherited;
- if (FTimeout > 0) then begin
- if not FTcpClient.WaitForData(FTimeout)
- then Exit(0);
+ pDest := Pointer(@buffer[offset]);
+
+ while TRUE do begin
+ if FTimeout > 0
+ then wfd := WaitForData( FTimeout, pDest, count, wsaError)
+ else wfd := WaitForData( SLEEP_TIME, pDest, count, wsaError);
+
+ case wfd of
+ TWaitForData.wfd_Error : Exit(0);
+ TWaitForData.wfd_HaveData : Break;
+ TWaitForData.wfd_Timeout : begin
+ if (FTimeout > 0)
+ then raise TTransportException.Create( TTransportException.TExceptionType.TimedOut,
+ SysErrorMessage(Cardinal(wsaError)));
+ end;
+ else
+ ASSERT( FALSE);
+ end;
end;
- result := FTcpClient.ReceiveBuf( Pointer(@buffer[offset])^, count);
+ Result := FTcpClient.ReceiveBuf( pDest^, count);
end;
function TTcpSocketStreamImpl.ToArray: TBytes;
@@ -1220,8 +1354,27 @@
end;
procedure TTcpSocketStreamImpl.Write(const buffer: TBytes; offset, count: Integer);
+var bCanWrite, bError : Boolean;
+ retval, wsaError : Integer;
begin
inherited;
+
+ if not FTcpClient.Active
+ then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen);
+
+ // The select function returns the total number of socket handles that are ready
+ // and contained in the fd_set structures, zero if the time limit expired,
+ // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
+ // WSAGetLastError can be used to retrieve a specific error code.
+ retval := Self.Select( nil, @bCanWrite, @bError, FTimeOut, wsaError);
+ if retval = SOCKET_ERROR
+ then raise TTransportException.Create( TTransportException.TExceptionType.Unknown,
+ SysErrorMessage(Cardinal(wsaError)));
+ if (retval = 0)
+ then raise TTransportException.Create( TTransportException.TExceptionType.TimedOut);
+ if bError or not bCanWrite
+ then raise TTransportException.Create( TTransportException.TExceptionType.Unknown);
+
FTcpClient.SendBuf( Pointer(@buffer[offset])^, count);
end;
diff --git a/lib/delphi/test/TestClient.pas b/lib/delphi/test/TestClient.pas
index 5e4d91c..d587e46 100644
--- a/lib/delphi/test/TestClient.pas
+++ b/lib/delphi/test/TestClient.pas
@@ -125,7 +125,7 @@
const
// pipe timeouts to be used
DEBUG_TIMEOUT = 30 * 1000;
- RELEASE_TIMEOUT = DEFAULT_THRIFT_PIPE_TIMEOUT;
+ RELEASE_TIMEOUT = DEFAULT_THRIFT_TIMEOUT;
TIMEOUT = RELEASE_TIMEOUT;
begin
bBuffered := False;;
diff --git a/lib/delphi/test/TestServer.pas b/lib/delphi/test/TestServer.pas
index c40c507..9d06e8e 100644
--- a/lib/delphi/test/TestServer.pas
+++ b/lib/delphi/test/TestServer.pas
@@ -499,7 +499,7 @@
const
// pipe timeouts to be used
DEBUG_TIMEOUT = 30 * 1000;
- RELEASE_TIMEOUT = DEFAULT_THRIFT_PIPE_TIMEOUT; // server-side default
+ RELEASE_TIMEOUT = DEFAULT_THRIFT_TIMEOUT; // server-side default
TIMEOUT = RELEASE_TIMEOUT;
begin
try
@@ -590,7 +590,7 @@
else begin
Console.WriteLine('- sockets (port '+IntToStr(port)+')');
if UseBufferedSockets then Console.WriteLine('- buffered sockets');
- servertrans := TServerSocketImpl.Create( Port, 5000, UseBufferedSockets);
+ servertrans := TServerSocketImpl.Create( Port, 0, UseBufferedSockets);
end;
ASSERT( servertrans <> nil);