THRIFT-2696 Unable to stop socket server while there are idle clients
Client: Delphi
Patch: Jens Geyer & Severian Duchenko
diff --git a/lib/delphi/src/Thrift.Server.pas b/lib/delphi/src/Thrift.Server.pas
index 8237a47..2fb5c90 100644
--- a/lib/delphi/src/Thrift.Server.pas
+++ b/lib/delphi/src/Thrift.Server.pas
@@ -325,6 +325,17 @@
       InputProtocol := nil;
       OutputProtocol := nil;
 
+      // close any old connections before before waiting for new clients
+      if client <> nil then try
+        try
+          client.Close;
+        finally
+          client := nil;
+        end;
+      except
+        // catch all, we can't do much about it at this point
+      end;
+
       client := FServerTransport.Accept( procedure
                                          begin
                                            if FServerEvents <> nil

diff --git a/lib/delphi/src/Thrift.Transport.Pipes.pas b/lib/delphi/src/Thrift.Transport.Pipes.pas
index d2816c9..eb4e8e3 100644
--- a/lib/delphi/src/Thrift.Transport.Pipes.pas
+++ b/lib/delphi/src/Thrift.Transport.Pipes.pas
@@ -29,7 +29,7 @@
   Thrift.Stream;
 
 const
-  DEFAULT_THRIFT_PIPE_TIMEOUT = 5 * 1000; // ms
+  DEFAULT_THRIFT_PIPE_TIMEOUT = DEFAULT_THRIFT_TIMEOUT deprecated 'use DEFAULT_THRIFT_TIMEOUT';
 
 
 
@@ -57,7 +57,7 @@
     function IsOpen: Boolean; override;
     function ToArray: TBytes; override;
   public
-    constructor Create( aEnableOverlapped : Boolean; const aTimeOut : DWORD = DEFAULT_THRIFT_PIPE_TIMEOUT);
+    constructor Create( aEnableOverlapped : Boolean; const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT);
     destructor Destroy;  override;
   end;
 
@@ -76,7 +76,7 @@
                         const aEnableOverlapped : Boolean;
                         const aShareMode: DWORD = 0;
                         const aSecurityAttributes: PSecurityAttributes = nil;
-                        const aTimeOut : DWORD = DEFAULT_THRIFT_PIPE_TIMEOUT);  overload;
+                        const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT);  overload;
   end;
 
 
@@ -90,7 +90,7 @@
   public
     constructor Create( const aPipeHandle : THandle;
                         const aOwnsHandle, aEnableOverlapped : Boolean;
-                        const aTimeOut : DWORD = DEFAULT_THRIFT_PIPE_TIMEOUT);  overload;
+                        const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT);  overload;
     destructor Destroy;  override;
   end;
 
@@ -120,7 +120,7 @@
     constructor Create( const aPipeName : string;
                         const aShareMode: DWORD = 0;
                         const aSecurityAttributes: PSecurityAttributes = nil;
-                        const aTimeOut : DWORD = DEFAULT_THRIFT_PIPE_TIMEOUT);  overload;
+                        const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT);  overload;
   end;
 
 
@@ -131,7 +131,7 @@
     // ITransport
     procedure Close; override;
     constructor Create( aPipe : THandle; aOwnsHandle : Boolean;
-                        const aTimeOut : DWORD = DEFAULT_THRIFT_PIPE_TIMEOUT); reintroduce;
+                        const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT); reintroduce;
   end;
 
 
@@ -260,7 +260,7 @@
 
 
 constructor TPipeStreamBase.Create( aEnableOverlapped : Boolean;
-                                    const aTimeOut : DWORD = DEFAULT_THRIFT_PIPE_TIMEOUT);
+                                    const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT);
 begin
   inherited Create;
   ASSERT( aTimeout > 0);
diff --git a/lib/delphi/src/Thrift.Transport.pas b/lib/delphi/src/Thrift.Transport.pas
index 69d74a3..bc66c64 100644
--- a/lib/delphi/src/Thrift.Transport.pas
+++ b/lib/delphi/src/Thrift.Transport.pas
@@ -27,7 +27,7 @@
   Classes,

   SysUtils,

   Math,

-  Sockets,

+  Sockets, WinSock,

   Generics.Collections,

   Thrift.Collections,

   Thrift.Utils,

@@ -152,9 +152,15 @@
   end;

 

   TTcpSocketStreamImpl = class( TThriftStreamImpl )

+  private type

+    TWaitForData = ( wfd_HaveData, wfd_Timeout, wfd_Error);

   private

     FTcpClient : TCustomIpClient;

     FTimeout : Integer;

+    function Select( ReadReady, WriteReady, ExceptFlag: PBoolean;

+                     TimeOut: Integer; var wsaError : Integer): Integer;

+    function WaitForData( TimeOut : Integer; pBuf : Pointer; DesiredBytes: Integer;

+                          var wsaError : Integer): TWaitForData;

   protected

     procedure Write( const buffer: TBytes; offset: Integer; count: Integer); override;

     function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer; override;

@@ -270,7 +276,7 @@
     function GetIsOpen: Boolean; override;

   public

     procedure Open; override;

-    constructor Create( const AClient : TCustomIpClient; ATimeout: Integer = 0); overload;

+    constructor Create( const AClient : TCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0); overload;

     constructor Create( const AHost: string; APort: Integer; ATimeout: Integer = 0); overload;

     destructor Destroy; override;

     procedure Close; override;

@@ -318,6 +324,10 @@
 procedure TFramedTransportImpl_Initialize;

 {$IFEND}

 

+const

+  DEFAULT_THRIFT_TIMEOUT = 5 * 1000; // ms

+

+

 implementation

 

 { TTransportImpl }

@@ -564,13 +574,14 @@
       'No underlying server socket.');

   end;

 

+  client := nil;

   try

     client := TCustomIpClient.Create(nil);

 

     if Assigned(fnAccepting)

     then fnAccepting();

 

-    if ( not FServer.Accept( client)) then

+    if not FServer.Accept( client) then

     begin

       client.Free;

       Result := nil;

@@ -583,14 +594,16 @@
       Exit;

     end;

 

-    trans := TSocketImpl.Create( client, FClientTimeout);

+    trans := TSocketImpl.Create( client, TRUE, FClientTimeout);

+    client := nil;  // trans owns it now

+

     if FUseBufferedSocket

     then result := TBufferedTransportImpl.Create( trans)

     else result := trans;

 

   except

-    on E: Exception do

-    begin

+    on E: Exception do begin

+      client.Free;

       raise TTransportException.Create( E.ToString );

     end;

   end;

@@ -628,11 +641,12 @@
 

 { TSocket }

 

-constructor TSocketImpl.Create( const AClient : TCustomIpClient; ATimeout: Integer = 0);

+constructor TSocketImpl.Create( const AClient : TCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0);

 var stream : IThriftStream;

 begin

   FClient := AClient;

   FTimeout := ATimeout;

+  FOwnsClient := aOwnsClient;

   stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);

   inherited Create( stream, stream);

 end;

@@ -648,37 +662,30 @@
 

 destructor TSocketImpl.Destroy;

 begin

-  if FOwnsClient then

-  begin

-    FClient.Free;

-  end;

+  if FOwnsClient

+  then FreeAndNil( FClient);

   inherited;

 end;

 

 procedure TSocketImpl.Close;

 begin

   inherited Close;

-  if FClient <> nil

+  if FOwnsClient

   then FreeAndNil( FClient);

 end;

 

 function TSocketImpl.GetIsOpen: Boolean;

 begin

-  Result := False;

-  if FClient <> nil then

-  begin

-    Result := FClient.Connected;

-  end;

+  Result := (FClient <> nil) and FClient.Connected;

 end;

 

 procedure TSocketImpl.InitSocket;

 var

   stream : IThriftStream;

 begin

-  if (FClient <> nil) and FOwnsClient then begin

-    FClient.Free;

-    FClient := nil;

-  end;

+  if FOwnsClient

+  then FreeAndNil( FClient)

+  else FClient := nil;

 

   FClient := TTcpClient.Create( nil);

   FOwnsClient := True;

@@ -1188,17 +1195,144 @@
   FTcpClient.Open;

 end;

 

-function TTcpSocketStreamImpl.Read(var buffer: TBytes; offset,

-  count: Integer): Integer;

+

+function TTcpSocketStreamImpl.Select( ReadReady, WriteReady, ExceptFlag: PBoolean;

+                                      TimeOut: Integer; var wsaError : Integer): Integer;

+var

+  ReadFds: TFDset;

+  ReadFdsptr: PFDset;

+  WriteFds: TFDset;

+  WriteFdsptr: PFDset;

+  ExceptFds: TFDset;

+  ExceptFdsptr: PFDset;

+  tv: timeval;

+  Timeptr: PTimeval;

+  socket : TSocket;

+begin

+  if not FTcpClient.Active then begin

+    wsaError := WSAEINVAL;

+    Exit( SOCKET_ERROR);

+  end;

+

+  socket := FTcpClient.Handle;

+

+  if Assigned(ReadReady) then

+  begin

+    ReadFdsptr := @ReadFds;

+    FD_ZERO(ReadFds);

+    FD_SET(socket, ReadFds);

+  end

+  else

+    ReadFdsptr := nil;

+

+  if Assigned(WriteReady) then

+  begin

+    WriteFdsptr := @WriteFds;

+    FD_ZERO(WriteFds);

+    FD_SET(socket, WriteFds);

+  end

+  else

+    WriteFdsptr := nil;

+

+  if Assigned(ExceptFlag) then

+  begin

+    ExceptFdsptr := @ExceptFds;

+    FD_ZERO(ExceptFds);

+    FD_SET(socket, ExceptFds);

+  end

+  else

+    ExceptFdsptr := nil;

+

+  if TimeOut >= 0 then

+  begin

+    tv.tv_sec := TimeOut div 1000;

+    tv.tv_usec :=  1000 * (TimeOut mod 1000);

+    Timeptr := @tv;

+  end

+  else

+    Timeptr := nil;  // wait forever

+

+  wsaError := 0;

+  try

+{$IFDEF MSWINDOWS}

+    result := WinSock.select(socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);

+{$ENDIF}

+{$IFDEF LINUX}

+    result := Libc.select(socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);

+{$ENDIF}

+    if result = SOCKET_ERROR

+    then wsaError := WSAGetLastError;

+

+  except

+    result := SOCKET_ERROR;

+  end;

+

+  if Assigned(ReadReady) then

+    ReadReady^ := FD_ISSET(socket, ReadFds);

+  if Assigned(WriteReady) then

+    WriteReady^ := FD_ISSET(socket, WriteFds);

+  if Assigned(ExceptFlag) then

+    ExceptFlag^ := FD_ISSET(socket, ExceptFds);

+end;

+

+function TTcpSocketStreamImpl.WaitForData( TimeOut : Integer; pBuf : Pointer;

+                                           DesiredBytes : Integer;

+                                           var wsaError : Integer): TWaitForData;

+var bCanRead, bError : Boolean;

+    retval : Integer;

+begin

+  // The select function returns the total number of socket handles that are ready

+  // and contained in the fd_set structures, zero if the time limit expired,

+  // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,

+  // WSAGetLastError can be used to retrieve a specific error code.

+  retval := Self.Select( @bCanRead, nil, @bError, TimeOut, wsaError);

+  if retval = SOCKET_ERROR

+  then Exit( TWaitForData.wfd_Error);

+  if (retval = 0) or not bCanRead

+  then Exit( TWaitForData.wfd_Timeout);

+

+  // recv() returns the number of bytes received, or -1 if an error occurred.

+  // The return value will be 0 when the peer has performed an orderly shutdown.

+  retval := recv( FTcpClient.Handle, pBuf^, DesiredBytes, WinSock.MSG_PEEK);

+  if retval <= 0

+  then Exit( TWaitForData.wfd_Error);

+

+  // Enough data ready to be read?

+  if retval = DesiredBytes

+  then result := TWaitForData.wfd_HaveData

+  else result := TWaitForData.wfd_Timeout;

+end;

+

+function TTcpSocketStreamImpl.Read(var buffer: TBytes; offset, count: Integer): Integer;

+var wfd : TWaitForData;

+    wsaError : Integer;

+    pDest : Pointer;

+const

+  SLEEP_TIME = 200;

 begin

   inherited;

 

-  if (FTimeout > 0) then begin

-    if not FTcpClient.WaitForData(FTimeout)

-    then Exit(0);

+  pDest := Pointer(@buffer[offset]);

+

+  while TRUE do begin

+    if FTimeout > 0

+    then wfd := WaitForData( FTimeout,   pDest, count, wsaError)

+    else wfd := WaitForData( SLEEP_TIME, pDest, count, wsaError);

+

+    case wfd of

+      TWaitForData.wfd_Error    :  Exit(0);

+      TWaitForData.wfd_HaveData :  Break;

+      TWaitForData.wfd_Timeout  :  begin

+        if (FTimeout > 0)

+        then raise TTransportException.Create( TTransportException.TExceptionType.TimedOut,

+                                               SysErrorMessage(Cardinal(wsaError)));

+      end;

+    else

+      ASSERT( FALSE);

+    end;

   end;

 

-  result := FTcpClient.ReceiveBuf( Pointer(@buffer[offset])^, count);

+  Result := FTcpClient.ReceiveBuf( pDest^, count);

 end;

 

 function TTcpSocketStreamImpl.ToArray: TBytes;

@@ -1220,8 +1354,27 @@
 end;

 

 procedure TTcpSocketStreamImpl.Write(const buffer: TBytes; offset, count: Integer);

+var bCanWrite, bError : Boolean;

+    retval, wsaError : Integer;

 begin

   inherited;

+

+  if not FTcpClient.Active

+  then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen);

+

+  // The select function returns the total number of socket handles that are ready

+  // and contained in the fd_set structures, zero if the time limit expired,

+  // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,

+  // WSAGetLastError can be used to retrieve a specific error code.

+  retval := Self.Select( nil, @bCanWrite, @bError, FTimeOut, wsaError);

+  if retval = SOCKET_ERROR

+  then raise TTransportException.Create( TTransportException.TExceptionType.Unknown,

+                                         SysErrorMessage(Cardinal(wsaError)));

+  if (retval = 0)

+  then raise TTransportException.Create( TTransportException.TExceptionType.TimedOut);

+  if bError or not bCanWrite

+  then raise TTransportException.Create( TTransportException.TExceptionType.Unknown);

+

   FTcpClient.SendBuf( Pointer(@buffer[offset])^, count);

 end;

 

diff --git a/lib/delphi/test/TestClient.pas b/lib/delphi/test/TestClient.pas
index 5e4d91c..d587e46 100644
--- a/lib/delphi/test/TestClient.pas
+++ b/lib/delphi/test/TestClient.pas
@@ -125,7 +125,7 @@
 const
   // pipe timeouts to be used
   DEBUG_TIMEOUT   = 30 * 1000;
-  RELEASE_TIMEOUT = DEFAULT_THRIFT_PIPE_TIMEOUT;
+  RELEASE_TIMEOUT = DEFAULT_THRIFT_TIMEOUT;
   TIMEOUT         = RELEASE_TIMEOUT;
 begin
   bBuffered := False;;
diff --git a/lib/delphi/test/TestServer.pas b/lib/delphi/test/TestServer.pas
index c40c507..9d06e8e 100644
--- a/lib/delphi/test/TestServer.pas
+++ b/lib/delphi/test/TestServer.pas
@@ -499,7 +499,7 @@
 const
   // pipe timeouts to be used
   DEBUG_TIMEOUT   = 30 * 1000;
-  RELEASE_TIMEOUT = DEFAULT_THRIFT_PIPE_TIMEOUT;  // server-side default
+  RELEASE_TIMEOUT = DEFAULT_THRIFT_TIMEOUT;  // server-side default
   TIMEOUT         = RELEASE_TIMEOUT;
 begin
   try
@@ -590,7 +590,7 @@
     else begin
       Console.WriteLine('- sockets (port '+IntToStr(port)+')');
       if UseBufferedSockets then Console.WriteLine('- buffered sockets');
-      servertrans := TServerSocketImpl.Create( Port, 5000, UseBufferedSockets);
+      servertrans := TServerSocketImpl.Create( Port, 0, UseBufferedSockets);
     end;
     ASSERT( servertrans <> nil);