THRIFT-2696 Unable to stop socket server while there are idle clients
Client: Delphi
Patch: Severian Duchenko & Jens Geyer
The patch contains some additional refactoring, e.g. I consolidated the excessively overloaded CTORs a bit.
diff --git a/lib/delphi/src/Thrift.Transport.pas b/lib/delphi/src/Thrift.Transport.pas
index b567aef..69d74a3 100644
--- a/lib/delphi/src/Thrift.Transport.pas
+++ b/lib/delphi/src/Thrift.Transport.pas
@@ -154,6 +154,7 @@
TTcpSocketStreamImpl = class( TThriftStreamImpl )
private
FTcpClient : TCustomIpClient;
+ FTimeout : Integer;
protected
procedure Write( const buffer: TBytes; offset: Integer; count: Integer); override;
function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer; override;
@@ -164,7 +165,7 @@
function IsOpen: Boolean; override;
function ToArray: TBytes; override;
public
- constructor Create( const ATcpClient: TCustomIpClient);
+ constructor Create( const ATcpClient: TCustomIpClient; const aTimeout : Integer = 0);
end;
IStreamTransport = interface( ITransport )
@@ -226,12 +227,8 @@
protected
function Accept( const fnAccepting: TProc) : ITransport; override;
public
- constructor Create( const AServer: TTcpServer ); overload;
- constructor Create( const AServer: TTcpServer; AClientTimeout: Integer); overload;
- constructor Create( APort: Integer); overload;
- constructor Create( APort: Integer; AClientTimeout: Integer); overload;
- constructor Create( APort: Integer; AClientTimeout: Integer;
- AUseBufferedSockets: Boolean); overload;
+ constructor Create( const AServer: TTcpServer; AClientTimeout: Integer = 0); overload;
+ constructor Create( APort: Integer; AClientTimeout: Integer = 0; AUseBufferedSockets: Boolean = FALSE); overload;
destructor Destroy; override;
procedure Listen; override;
procedure Close; override;
@@ -273,9 +270,8 @@
function GetIsOpen: Boolean; override;
public
procedure Open; override;
- constructor Create( const AClient : TCustomIpClient); overload;
- constructor Create( const AHost: string; APort: Integer); overload;
- constructor Create( const AHost: string; APort: Integer; ATimeout: Integer); overload;
+ constructor Create( const AClient : TCustomIpClient; ATimeout: Integer = 0); overload;
+ constructor Create( const AHost: string; APort: Integer; ATimeout: Integer = 0); overload;
destructor Destroy; override;
procedure Close; override;
property TcpClient: TCustomIpClient read FClient;
@@ -532,16 +528,29 @@
FClientTimeout := AClientTimeout;
end;
-constructor TServerSocketImpl.Create( const AServer: TTcpServer);
+constructor TServerSocketImpl.Create(APort, AClientTimeout: Integer; AUseBufferedSockets: Boolean);
begin
- //no inherited;
- Create( AServer, 0 );
+ inherited Create;
+ FPort := APort;
+ FClientTimeout := AClientTimeout;
+ FUseBufferedSocket := AUseBufferedSockets;
+ FOwnsServer := True;
+ FServer := TTcpServer.Create( nil );
+ FServer.BlockMode := bmBlocking;
+{$IF CompilerVersion >= 21.0}
+ FServer.LocalPort := AnsiString( IntToStr( FPort));
+{$ELSE}
+ FServer.LocalPort := IntToStr( FPort);
+{$IFEND}
end;
-constructor TServerSocketImpl.Create(APort: Integer);
+destructor TServerSocketImpl.Destroy;
begin
- //no inherited;
- Create( APort, 0 );
+ if FOwnsServer then begin
+ FServer.Free;
+ FServer := nil;
+ end;
+ inherited;
end;
function TServerSocketImpl.Accept( const fnAccepting: TProc): ITransport;
@@ -574,7 +583,7 @@
Exit;
end;
- trans := TSocketImpl.Create( client);
+ trans := TSocketImpl.Create( client, FClientTimeout);
if FUseBufferedSocket
then result := TBufferedTransportImpl.Create( trans)
else result := trans;
@@ -587,6 +596,21 @@
end;
end;
+procedure TServerSocketImpl.Listen;
+begin
+ if FServer <> nil then
+ begin
+ try
+ FServer.Active := True;
+ except
+ on E: Exception do
+ begin
+ raise TTransportException.Create('Could not accept on listening socket: ' + E.Message);
+ end;
+ end;
+ end;
+end;
+
procedure TServerSocketImpl.Close;
begin
if FServer <> nil then
@@ -602,77 +626,17 @@
end;
end;
-constructor TServerSocketImpl.Create(APort, AClientTimeout: Integer;
- AUseBufferedSockets: Boolean);
-begin
- inherited Create;
- FPort := APort;
- FClientTimeout := AClientTimeout;
- FUseBufferedSocket := AUseBufferedSockets;
- FOwnsServer := True;
- FServer := TTcpServer.Create( nil );
- FServer.BlockMode := bmBlocking;
-{$IF CompilerVersion >= 21.0}
- FServer.LocalPort := AnsiString( IntToStr( FPort));
-{$ELSE}
- FServer.LocalPort := IntToStr( FPort);
-{$IFEND}
-end;
-
-destructor TServerSocketImpl.Destroy;
-begin
- if FOwnsServer then
- begin
- FServer.Free;
- end;
- inherited;
-end;
-
-procedure TServerSocketImpl.Listen;
-begin
- if FServer <> nil then
- begin
- try
- FServer.Active := True;
- except
- on E: Exception do
- begin
- raise TTransportException.Create('Could not accept on listening socket: ' + E.Message);
- end;
- end;
- end;
-end;
-
-constructor TServerSocketImpl.Create(APort, AClientTimeout: Integer);
-begin
- //no inherited;
- Create( APort, AClientTimeout, False );
-end;
-
{ TSocket }
-constructor TSocketImpl.Create( const AClient : TCustomIpClient);
-var
- stream : IThriftStream;
+constructor TSocketImpl.Create( const AClient : TCustomIpClient; ATimeout: Integer = 0);
+var stream : IThriftStream;
begin
FClient := AClient;
- stream := TTcpSocketStreamImpl.Create( FClient);
+ FTimeout := ATimeout;
+ stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
inherited Create( stream, stream);
end;
-constructor TSocketImpl.Create(const AHost: string; APort: Integer);
-begin
- //no inherited;
- Create( AHost, APort, 0);
-end;
-
-procedure TSocketImpl.Close;
-begin
- inherited Close;
- if FClient <> nil
- then FreeAndNil( FClient);
-end;
-
constructor TSocketImpl.Create(const AHost: string; APort, ATimeout: Integer);
begin
inherited Create(nil,nil);
@@ -691,6 +655,13 @@
inherited;
end;
+procedure TSocketImpl.Close;
+begin
+ inherited Close;
+ if FClient <> nil
+ then FreeAndNil( FClient);
+end;
+
function TSocketImpl.GetIsOpen: Boolean;
begin
Result := False;
@@ -704,21 +675,17 @@
var
stream : IThriftStream;
begin
- if FClient <> nil then
- begin
- if FOwnsClient then
- begin
- FClient.Free;
- FClient := nil;
- end;
+ if (FClient <> nil) and FOwnsClient then begin
+ FClient.Free;
+ FClient := nil;
end;
- FClient := TTcpClient.Create( nil );
+
+ FClient := TTcpClient.Create( nil);
FOwnsClient := True;
- stream := TTcpSocketStreamImpl.Create( FClient);
+ stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
FInputStream := stream;
FOutputStream := stream;
-
end;
procedure TSocketImpl.Open;
@@ -750,7 +717,7 @@
FClient.RemotePort := TSocketPort( IntToStr( Port));
FClient.Connect;
- FInputStream := TTcpSocketStreamImpl.Create( FClient);
+ FInputStream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
FOutputStream := FInputStream;
end;
@@ -1199,10 +1166,11 @@
FTcpClient.Close;
end;
-constructor TTcpSocketStreamImpl.Create( const ATcpClient: TCustomIpClient);
+constructor TTcpSocketStreamImpl.Create( const ATcpClient: TCustomIpClient; const aTimeout : Integer);
begin
inherited Create;
FTcpClient := ATcpClient;
+ FTimeout := aTimeout;
end;
procedure TTcpSocketStreamImpl.Flush;
@@ -1224,7 +1192,13 @@
count: Integer): Integer;
begin
inherited;
- Result := FTcpClient.ReceiveBuf( Pointer(@buffer[offset])^, count);
+
+ if (FTimeout > 0) then begin
+ if not FTcpClient.WaitForData(FTimeout)
+ then Exit(0);
+ end;
+
+ result := FTcpClient.ReceiveBuf( Pointer(@buffer[offset])^, count);
end;
function TTcpSocketStreamImpl.ToArray: TBytes;
diff --git a/lib/delphi/test/TestServer.pas b/lib/delphi/test/TestServer.pas
index 6aa2daf..c40c507 100644
--- a/lib/delphi/test/TestServer.pas
+++ b/lib/delphi/test/TestServer.pas
@@ -590,7 +590,7 @@
else begin
Console.WriteLine('- sockets (port '+IntToStr(port)+')');
if UseBufferedSockets then Console.WriteLine('- buffered sockets');
- servertrans := TServerSocketImpl.Create( Port, 0, UseBufferedSockets);
+ servertrans := TServerSocketImpl.Create( Port, 5000, UseBufferedSockets);
end;
ASSERT( servertrans <> nil);