THRIFT-2415 Named pipes server performance & message mode
Patch: Jens Geyer
diff --git a/lib/delphi/src/Thrift.Transport.Pipes.pas b/lib/delphi/src/Thrift.Transport.Pipes.pas
index c2696f4..d2816c9 100644
--- a/lib/delphi/src/Thrift.Transport.Pipes.pas
+++ b/lib/delphi/src/Thrift.Transport.Pipes.pas
@@ -25,12 +25,14 @@
uses
Windows, SysUtils, Math, AccCtrl, AclAPI, SyncObjs,
Thrift.Transport,
+ Thrift.Utils,
Thrift.Stream;
const
DEFAULT_THRIFT_PIPE_TIMEOUT = 5 * 1000; // ms
+
type
//--- Pipe Streams ---
@@ -39,6 +41,7 @@
strict protected
FPipe : THandle;
FTimeout : DWORD;
+ FOverlapped : Boolean;
procedure Write( const buffer: TBytes; offset: Integer; count: Integer); override;
function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer; override;
@@ -46,25 +49,31 @@
procedure Close; override;
procedure Flush; override;
+ function ReadDirect( var buffer: TBytes; offset: Integer; count: Integer): Integer;
+ function ReadOverlapped( var buffer: TBytes; offset: Integer; count: Integer): Integer;
+ procedure WriteDirect( const buffer: TBytes; offset: Integer; count: Integer);
+ procedure WriteOverlapped( const buffer: TBytes; offset: Integer; count: Integer);
+
function IsOpen: Boolean; override;
function ToArray: TBytes; override;
public
- constructor Create( const aTimeOut : DWORD = DEFAULT_THRIFT_PIPE_TIMEOUT);
+ constructor Create( aEnableOverlapped : Boolean; const aTimeOut : DWORD = DEFAULT_THRIFT_PIPE_TIMEOUT);
destructor Destroy; override;
end;
TNamedPipeStreamImpl = class sealed( TPipeStreamBase)
- private
+ strict private
FPipeName : string;
FShareMode : DWORD;
FSecurityAttribs : PSecurityAttributes;
- protected
+ strict protected
procedure Open; override;
public
constructor Create( const aPipeName : string;
+ const aEnableOverlapped : Boolean;
const aShareMode: DWORD = 0;
const aSecurityAttributes: PSecurityAttributes = nil;
const aTimeOut : DWORD = DEFAULT_THRIFT_PIPE_TIMEOUT); overload;
@@ -72,14 +81,16 @@
THandlePipeStreamImpl = class sealed( TPipeStreamBase)
- private
+ strict private
FSrcHandle : THandle;
- protected
+ strict protected
procedure Open; override;
public
- constructor Create( const aPipeHandle : THandle; aOwnsHandle : Boolean); overload;
+ constructor Create( const aPipeHandle : THandle;
+ const aOwnsHandle, aEnableOverlapped : Boolean;
+ const aTimeOut : DWORD = DEFAULT_THRIFT_PIPE_TIMEOUT); overload;
destructor Destroy; override;
end;
@@ -104,7 +115,8 @@
TNamedPipeTransportClientEndImpl = class( TPipeTransportBase)
public
// Named pipe constructors
- constructor Create( aPipe : THandle; aOwnsHandle : Boolean); overload;
+ constructor Create( aPipe : THandle; aOwnsHandle : Boolean;
+ const aTimeOut : DWORD); overload;
constructor Create( const aPipeName : string;
const aShareMode: DWORD = 0;
const aSecurityAttributes: PSecurityAttributes = nil;
@@ -118,7 +130,8 @@
public
// ITransport
procedure Close; override;
- constructor Create( aPipe : THandle; aOwnsHandle : Boolean); reintroduce;
+ constructor Create( aPipe : THandle; aOwnsHandle : Boolean;
+ const aTimeOut : DWORD = DEFAULT_THRIFT_PIPE_TIMEOUT); reintroduce;
end;
@@ -150,17 +163,20 @@
TPipeServerTransportBase = class( TServerTransportImpl)
- protected
- FStopServer : Boolean;
+ strict protected
+ FStopServer : TEvent;
procedure InternalClose; virtual; abstract;
+ function QueryStopServer : Boolean;
public
+ constructor Create;
+ destructor Destroy; override;
procedure Listen; override;
procedure Close; override;
end;
TAnonymousPipeServerTransportImpl = class( TPipeServerTransportBase, IAnonymousPipeServerTransport)
- private
+ strict private
FBufSize : DWORD;
// Server side anonymous pipe handles
@@ -190,7 +206,7 @@
TNamedPipeServerTransportImpl = class( TPipeServerTransportBase, INamedPipeServerTransport)
- private
+ strict private
FPipeName : string;
FMaxConns : DWORD;
FBufSize : DWORD;
@@ -199,7 +215,7 @@
FConnected : Boolean;
- protected
+ strict protected
function Accept(const fnAccepting: TProc): ITransport; override;
function CreateNamedPipe : THandle;
function CreateTransportInstance : ITransport;
@@ -243,11 +259,14 @@
{ TPipeStreamBase }
-constructor TPipeStreamBase.Create( const aTimeOut : DWORD = DEFAULT_THRIFT_PIPE_TIMEOUT);
+constructor TPipeStreamBase.Create( aEnableOverlapped : Boolean;
+ const aTimeOut : DWORD = DEFAULT_THRIFT_PIPE_TIMEOUT);
begin
inherited Create;
- FPipe := INVALID_HANDLE_VALUE;
- FTimeout := aTimeOut;
+ ASSERT( aTimeout > 0);
+ FPipe := INVALID_HANDLE_VALUE;
+ FTimeout := aTimeOut;
+ FOverlapped := aEnableOverlapped;
end;
@@ -280,6 +299,22 @@
procedure TPipeStreamBase.Write(const buffer: TBytes; offset, count: Integer);
+begin
+ if FOverlapped
+ then WriteOverlapped( buffer, offset, count)
+ else WriteDirect( buffer, offset, count);
+end;
+
+
+function TPipeStreamBase.Read( var buffer: TBytes; offset, count: Integer): Integer;
+begin
+ if FOverlapped
+ then result := ReadOverlapped( buffer, offset, count)
+ else result := ReadDirect( buffer, offset, count);
+end;
+
+
+procedure TPipeStreamBase.WriteDirect(const buffer: TBytes; offset, count: Integer);
var cbWritten : DWORD;
begin
if not IsOpen
@@ -292,7 +327,7 @@
end;
-function TPipeStreamBase.Read( var buffer: TBytes; offset, count: Integer): Integer;
+function TPipeStreamBase.ReadDirect( var buffer: TBytes; offset, count: Integer): Integer;
var cbRead, dwErr : DWORD;
bytes, retries : LongInt;
bOk : Boolean;
@@ -338,6 +373,84 @@
end;
+procedure TPipeStreamBase.WriteOverlapped(const buffer: TBytes; offset, count: Integer);
+var cbWritten, dwWait, dwError : DWORD;
+ overlapped : IOverlappedHelper;
+begin
+ if not IsOpen
+ then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
+ 'Called write on non-open pipe');
+
+ overlapped := TOverlappedHelperImpl.Create;
+
+ if not WriteFile( FPipe, buffer[offset], count, cbWritten, overlapped.OverlappedPtr)
+ then begin
+ dwError := GetLastError;
+ case dwError of
+ ERROR_IO_PENDING : begin
+ dwWait := overlapped.WaitFor(FTimeout);
+
+ if (dwWait = WAIT_TIMEOUT)
+ then raise TTransportException.Create( TTransportException.TExceptionType.TimedOut,
+ 'Pipe write timed out');
+
+ if (dwWait <> WAIT_OBJECT_0)
+ or not GetOverlappedResult( FPipe, overlapped.Overlapped, cbWritten, TRUE)
+ then raise TTransportException.Create( TTransportException.TExceptionType.Unknown,
+ 'Pipe write error');
+ end;
+
+ else
+ raise TTransportException.Create( TTransportException.TExceptionType.Unknown,
+ SysErrorMessage(dwError));
+ end;
+ end;
+
+ ASSERT( DWORD(count) = cbWritten);
+end;
+
+
+function TPipeStreamBase.ReadOverlapped( var buffer: TBytes; offset, count: Integer): Integer;
+var cbRead, dwWait, dwError : DWORD;
+ bOk : Boolean;
+ overlapped : IOverlappedHelper;
+begin
+ if not IsOpen
+ then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
+ 'Called read on non-open pipe');
+
+ overlapped := TOverlappedHelperImpl.Create;
+
+ // read the data
+ bOk := ReadFile( FPipe, buffer[offset], count, cbRead, overlapped.OverlappedPtr);
+ if not bOk then begin
+ dwError := GetLastError;
+ case dwError of
+ ERROR_IO_PENDING : begin
+ dwWait := overlapped.WaitFor(FTimeout);
+
+ if (dwWait = WAIT_TIMEOUT)
+ then raise TTransportException.Create( TTransportException.TExceptionType.TimedOut,
+ 'Pipe read timed out');
+
+ if (dwWait <> WAIT_OBJECT_0)
+ or not GetOverlappedResult( FPipe, overlapped.Overlapped, cbRead, TRUE)
+ then raise TTransportException.Create( TTransportException.TExceptionType.Unknown,
+ 'Pipe read error');
+ end;
+
+ else
+ raise TTransportException.Create( TTransportException.TExceptionType.Unknown,
+ SysErrorMessage(dwError));
+ end;
+ end;
+
+ ASSERT( cbRead > 0); // see TTransportImpl.ReadAll()
+ ASSERT( cbRead = DWORD(count));
+ result := cbRead;
+end;
+
+
function TPipeStreamBase.ToArray: TBytes;
var bytes : LongInt;
begin
@@ -357,11 +470,13 @@
{ TNamedPipeStreamImpl }
-constructor TNamedPipeStreamImpl.Create( const aPipeName : string; const aShareMode: DWORD;
+constructor TNamedPipeStreamImpl.Create( const aPipeName : string;
+ const aEnableOverlapped : Boolean;
+ const aShareMode: DWORD;
const aSecurityAttributes: PSecurityAttributes;
const aTimeOut : DWORD);
begin
- inherited Create( aTimeout);
+ inherited Create( aEnableOverlapped, aTimeout);
FPipeName := aPipeName;
FShareMode := aShareMode;
@@ -374,7 +489,6 @@
procedure TNamedPipeStreamImpl.Open;
var hPipe : THandle;
- dwMode : DWORD;
begin
if IsOpen then Exit;
@@ -389,21 +503,13 @@
FShareMode, // sharing
FSecurityAttribs, // security attributes
OPEN_EXISTING, // opens existing pipe
- 0, // default attributes
+ FILE_FLAG_OVERLAPPED or FILE_FLAG_WRITE_THROUGH, // async+fast, please
0); // no template file
if hPipe = INVALID_HANDLE_VALUE
then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
'Unable to open pipe, '+SysErrorMessage(GetLastError));
- // pipe connected; change to message-read mode.
- dwMode := PIPE_READMODE_MESSAGE;
- if not SetNamedPipeHandleState( hPipe, dwMode, nil, nil) then begin
- Close;
- raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
- 'SetNamedPipeHandleState failed');
- end;
-
// everything fine
FPipe := hPipe;
end;
@@ -412,9 +518,11 @@
{ THandlePipeStreamImpl }
-constructor THandlePipeStreamImpl.Create( const aPipeHandle : THandle; aOwnsHandle : Boolean);
+constructor THandlePipeStreamImpl.Create( const aPipeHandle : THandle;
+ const aOwnsHandle, aEnableOverlapped : Boolean;
+ const aTimeOut : DWORD);
begin
- inherited Create( DEFAULT_THRIFT_PIPE_TIMEOUT);
+ inherited Create( aEnableOverlapped, aTimeOut);
if aOwnsHandle
then FSrcHandle := aPipeHandle
@@ -474,16 +582,17 @@
// Named pipe constructor
begin
inherited Create( nil, nil);
- FInputStream := TNamedPipeStreamImpl.Create( aPipeName, aShareMode, aSecurityAttributes, aTimeOut);
+ FInputStream := TNamedPipeStreamImpl.Create( aPipeName, TRUE, aShareMode, aSecurityAttributes, aTimeOut);
FOutputStream := FInputStream; // true for named pipes
end;
-constructor TNamedPipeTransportClientEndImpl.Create( aPipe : THandle; aOwnsHandle : Boolean);
+constructor TNamedPipeTransportClientEndImpl.Create( aPipe : THandle; aOwnsHandle : Boolean;
+ const aTimeOut : DWORD);
// Named pipe constructor
begin
inherited Create( nil, nil);
- FInputStream := THandlePipeStreamImpl.Create( aPipe, aOwnsHandle);
+ FInputStream := THandlePipeStreamImpl.Create( aPipe, TRUE, aOwnsHandle, aTimeOut);
FOutputStream := FInputStream; // true for named pipes
end;
@@ -491,11 +600,12 @@
{ TNamedPipeTransportServerEndImpl }
-constructor TNamedPipeTransportServerEndImpl.Create( aPipe : THandle; aOwnsHandle : Boolean);
+constructor TNamedPipeTransportServerEndImpl.Create( aPipe : THandle; aOwnsHandle : Boolean;
+ const aTimeOut : DWORD);
// Named pipe constructor
begin
FHandle := DuplicatePipeHandle( aPipe);
- inherited Create( aPipe, aOwnsHandle);
+ inherited Create( aPipe, aOwnsHandle, aTimeOut);
end;
@@ -516,23 +626,48 @@
// Anonymous pipe constructor
begin
inherited Create( nil, nil);
- FInputStream := THandlePipeStreamImpl.Create( aPipeRead, aOwnsHandles);
- FOutputStream := THandlePipeStreamImpl.Create( aPipeWrite, aOwnsHandles);
+ // overlapped is not supported with AnonPipes, see MSDN
+ FInputStream := THandlePipeStreamImpl.Create( aPipeRead, aOwnsHandles, FALSE);
+ FOutputStream := THandlePipeStreamImpl.Create( aPipeWrite, aOwnsHandles, FALSE);
end;
{ TPipeServerTransportBase }
+constructor TPipeServerTransportBase.Create;
+begin
+ inherited Create;
+ FStopServer := TEvent.Create(nil,TRUE,FALSE,''); // manual reset
+end;
+
+
+destructor TPipeServerTransportBase.Destroy;
+begin
+ try
+ FreeAndNil( FStopServer);
+ finally
+ inherited Destroy;
+ end;
+end;
+
+
+function TPipeServerTransportBase.QueryStopServer : Boolean;
+begin
+ result := (FStopServer = nil)
+ or (FStopServer.WaitFor(0) <> wrTimeout);
+end;
+
+
procedure TPipeServerTransportBase.Listen;
begin
- FStopServer := FALSE;
+ FStopServer.ResetEvent;
end;
procedure TPipeServerTransportBase.Close;
begin
- FStopServer := TRUE;
+ FStopServer.SetEvent;
InternalClose;
end;
@@ -660,6 +795,7 @@
// Named Pipe CTOR
begin
inherited Create;
+ ASSERT( aTimeout > 0);
FPipeName := aPipename;
FBufsize := aBufSize;
FMaxConns := Max( 1, Min( PIPE_UNLIMITED_INSTANCES, aMaxConns));
@@ -674,61 +810,54 @@
function TNamedPipeServerTransportImpl.Accept(const fnAccepting: TProc): ITransport;
var dwError, dwWait, dwDummy : DWORD;
- overlapped : TOverlapped;
- event : TEvent;
+ overlapped : IOverlappedHelper;
+ handles : array[0..1] of THandle;
begin
- FillChar( overlapped, SizeOf(overlapped), 0);
- event := TEvent.Create( nil, TRUE, FALSE, ''); // always ManualReset, see MSDN
- try
- overlapped.hEvent := event.Handle;
+ overlapped := TOverlappedHelperImpl.Create;
- ASSERT( not FConnected);
- while not FConnected do begin
- InternalClose;
- if FStopServer then Abort;
- CreateNamedPipe;
+ ASSERT( not FConnected);
+ while not FConnected do begin
+ InternalClose;
+ if QueryStopServer then Abort;
+ CreateNamedPipe;
- if Assigned(fnAccepting)
- then fnAccepting();
+ if Assigned(fnAccepting)
+ then fnAccepting();
- // Wait for the client to connect; if it succeeds, the
- // function returns a nonzero value. If the function returns
- // zero, GetLastError should return ERROR_PIPE_CONNECTED.
- if ConnectNamedPipe( Handle, @overlapped) then begin
- FConnected := TRUE;
- Break;
- end;
-
- // ConnectNamedPipe() returns FALSE for OverlappedIO, even if connected.
- // We have to check GetLastError() explicitly to find out
- dwError := GetLastError;
- case dwError of
- ERROR_PIPE_CONNECTED : begin
- FConnected := not FStopServer; // special case: pipe immediately connected
- end;
-
- ERROR_IO_PENDING : begin
- repeat
- dwWait := WaitForSingleObject( overlapped.hEvent, DEFAULT_THRIFT_PIPE_TIMEOUT);
- until (dwWait <> WAIT_TIMEOUT) or FStopServer;
- FConnected := (dwWait = WAIT_OBJECT_0)
- and GetOverlappedResult( Handle, overlapped, dwDummy, TRUE)
- and not FStopServer;
- end;
-
- else
- InternalClose;
- raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
- 'Client connection failed');
- end;
+ // Wait for the client to connect; if it succeeds, the
+ // function returns a nonzero value. If the function returns
+ // zero, GetLastError should return ERROR_PIPE_CONNECTED.
+ if ConnectNamedPipe( Handle, overlapped.OverlappedPtr) then begin
+ FConnected := TRUE;
+ Break;
end;
- // create the transport impl
- result := CreateTransportInstance;
+ // ConnectNamedPipe() returns FALSE for OverlappedIO, even if connected.
+ // We have to check GetLastError() explicitly to find out
+ dwError := GetLastError;
+ case dwError of
+ ERROR_PIPE_CONNECTED : begin
+ FConnected := not QueryStopServer; // special case: pipe immediately connected
+ end;
- finally
- event.Free;
+ ERROR_IO_PENDING : begin
+ handles[0] := overlapped.WaitHandle;
+ handles[1] := FStopServer.Handle;
+ dwWait := WaitForMultipleObjects( 2, @handles, FALSE, FTimeout);
+ FConnected := (dwWait = WAIT_OBJECT_0)
+ and GetOverlappedResult( Handle, overlapped.Overlapped, dwDummy, TRUE)
+ and not QueryStopServer;
+ end;
+
+ else
+ InternalClose;
+ raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
+ 'Client connection failed');
+ end;
end;
+
+ // create the transport impl
+ result := CreateTransportInstance;
end;
@@ -739,7 +868,7 @@
hPipe := THandle( InterlockedExchangePointer( Pointer(FHandle), Pointer(INVALID_HANDLE_VALUE)));
try
FConnected := FALSE;
- result := TNamedPipeTransportServerEndImpl.Create( hPipe, TRUE);
+ result := TNamedPipeTransportServerEndImpl.Create( hPipe, TRUE, FTimeout);
except
ClosePipeHandle(hPipe);
raise;
@@ -819,8 +948,8 @@
result := Windows.CreateNamedPipe( PChar( FPipeName), // pipe name
PIPE_ACCESS_DUPLEX or // read/write access
FILE_FLAG_OVERLAPPED, // async mode
- PIPE_TYPE_MESSAGE or // message type pipe
- PIPE_READMODE_MESSAGE, // message-read mode
+ PIPE_TYPE_BYTE or // byte type pipe
+ PIPE_READMODE_BYTE, // byte read mode
FMaxConns, // max. instances
FBufSize, // output buffer size
FBufSize, // input buffer size
diff --git a/lib/delphi/src/Thrift.Utils.pas b/lib/delphi/src/Thrift.Utils.pas
index 72c0dc1..ec8190c 100644
--- a/lib/delphi/src/Thrift.Utils.pas
+++ b/lib/delphi/src/Thrift.Utils.pas
@@ -21,10 +21,40 @@
interface
+uses
+ Classes, Windows, SysUtils, SyncObjs;
+
+type
+ IOverlappedHelper = interface
+ ['{A1832EFA-2E02-4884-8F09-F0A0277157FA}']
+ function Overlapped : TOverlapped;
+ function OverlappedPtr : POverlapped;
+ function WaitHandle : THandle;
+ function WaitFor(dwTimeout: DWORD) : DWORD;
+ end;
+
+ TOverlappedHelperImpl = class( TInterfacedObject, IOverlappedHelper)
+ strict protected
+ FOverlapped : TOverlapped;
+ FEvent : TEvent;
+
+ // IOverlappedHelper
+ function Overlapped : TOverlapped;
+ function OverlappedPtr : POverlapped;
+ function WaitHandle : THandle;
+ function WaitFor(dwTimeout: DWORD) : DWORD;
+ public
+ constructor Create;
+ destructor Destroy; override;
+ end;
+
+
+
function IfValue(B: Boolean; const TrueValue, FalseValue: WideString): string;
implementation
+
function IfValue(B: Boolean; const TrueValue, FalseValue: WideString): string;
begin
if B then
@@ -33,4 +63,56 @@
Result := FalseValue;
end;
+
+{ TOverlappedHelperImpl }
+
+constructor TOverlappedHelperImpl.Create;
+begin
+ inherited Create;
+ FillChar( FOverlapped, SizeOf(FOverlapped), 0);
+ FEvent := TEvent.Create( nil, TRUE, FALSE, ''); // always ManualReset, see MSDN
+ FOverlapped.hEvent := FEvent.Handle;
+end;
+
+
+
+destructor TOverlappedHelperImpl.Destroy;
+begin
+ try
+ FOverlapped.hEvent := 0;
+ FreeAndNil( FEvent);
+
+ finally
+ inherited Destroy;
+ end;
+
+end;
+
+
+function TOverlappedHelperImpl.Overlapped : TOverlapped;
+begin
+ result := FOverlapped;
+end;
+
+
+function TOverlappedHelperImpl.OverlappedPtr : POverlapped;
+begin
+ result := @FOverlapped;
+end;
+
+
+function TOverlappedHelperImpl.WaitHandle : THandle;
+begin
+ result := FOverlapped.hEvent;
+end;
+
+
+function TOverlappedHelperImpl.WaitFor( dwTimeout : DWORD) : DWORD;
+begin
+ result := WaitForSingleObject( FOverlapped.hEvent, dwTimeout);
+end;
+
+
+
+
end.
diff --git a/lib/delphi/test/TestServer.pas b/lib/delphi/test/TestServer.pas
index 656fa15..6aa2daf 100644
--- a/lib/delphi/test/TestServer.pas
+++ b/lib/delphi/test/TestServer.pas
@@ -499,7 +499,7 @@
const
// pipe timeouts to be used
DEBUG_TIMEOUT = 30 * 1000;
- RELEASE_TIMEOUT = 0; // server-side default
+ RELEASE_TIMEOUT = DEFAULT_THRIFT_PIPE_TIMEOUT; // server-side default
TIMEOUT = RELEASE_TIMEOUT;
begin
try