THRIFT-2415 Named pipes server performance & message mode

Patch: Jens Geyer
diff --git a/lib/delphi/src/Thrift.Transport.Pipes.pas b/lib/delphi/src/Thrift.Transport.Pipes.pas
index c2696f4..d2816c9 100644
--- a/lib/delphi/src/Thrift.Transport.Pipes.pas
+++ b/lib/delphi/src/Thrift.Transport.Pipes.pas
@@ -25,12 +25,14 @@
 uses
   Windows, SysUtils, Math, AccCtrl, AclAPI, SyncObjs,
   Thrift.Transport,
+  Thrift.Utils,
   Thrift.Stream;
 
 const
   DEFAULT_THRIFT_PIPE_TIMEOUT = 5 * 1000; // ms
 
 
+
 type
   //--- Pipe Streams ---
 
@@ -39,6 +41,7 @@
   strict protected
     FPipe    : THandle;
     FTimeout : DWORD;
+    FOverlapped : Boolean;
 
     procedure Write( const buffer: TBytes; offset: Integer; count: Integer); override;
     function  Read( var buffer: TBytes; offset: Integer; count: Integer): Integer; override;
@@ -46,25 +49,31 @@
     procedure Close; override;
     procedure Flush; override;
 
+    function  ReadDirect(     var buffer: TBytes; offset: Integer; count: Integer): Integer;
+    function  ReadOverlapped( var buffer: TBytes; offset: Integer; count: Integer): Integer;
+    procedure WriteDirect(     const buffer: TBytes; offset: Integer; count: Integer);
+    procedure WriteOverlapped( const buffer: TBytes; offset: Integer; count: Integer);
+
     function IsOpen: Boolean; override;
     function ToArray: TBytes; override;
   public
-    constructor Create( const aTimeOut : DWORD = DEFAULT_THRIFT_PIPE_TIMEOUT);
+    constructor Create( aEnableOverlapped : Boolean; const aTimeOut : DWORD = DEFAULT_THRIFT_PIPE_TIMEOUT);
     destructor Destroy;  override;
   end;
 
 
   TNamedPipeStreamImpl = class sealed( TPipeStreamBase)
-  private
+  strict private
     FPipeName  : string;
     FShareMode : DWORD;
     FSecurityAttribs : PSecurityAttributes;
 
-  protected
+  strict protected
     procedure Open; override;
 
   public
     constructor Create( const aPipeName : string;
+                        const aEnableOverlapped : Boolean;
                         const aShareMode: DWORD = 0;
                         const aSecurityAttributes: PSecurityAttributes = nil;
                         const aTimeOut : DWORD = DEFAULT_THRIFT_PIPE_TIMEOUT);  overload;
@@ -72,14 +81,16 @@
 
 
   THandlePipeStreamImpl = class sealed( TPipeStreamBase)
-  private
+  strict private
     FSrcHandle : THandle;
 
-  protected
+  strict protected
     procedure Open; override;
 
   public
-    constructor Create( const aPipeHandle : THandle; aOwnsHandle : Boolean);  overload;
+    constructor Create( const aPipeHandle : THandle;
+                        const aOwnsHandle, aEnableOverlapped : Boolean;
+                        const aTimeOut : DWORD = DEFAULT_THRIFT_PIPE_TIMEOUT);  overload;
     destructor Destroy;  override;
   end;
 
@@ -104,7 +115,8 @@
   TNamedPipeTransportClientEndImpl = class( TPipeTransportBase)
   public
     // Named pipe constructors
-    constructor Create( aPipe : THandle; aOwnsHandle : Boolean); overload;
+    constructor Create( aPipe : THandle; aOwnsHandle : Boolean;
+                        const aTimeOut : DWORD); overload;
     constructor Create( const aPipeName : string;
                         const aShareMode: DWORD = 0;
                         const aSecurityAttributes: PSecurityAttributes = nil;
@@ -118,7 +130,8 @@
   public
     // ITransport
     procedure Close; override;
-    constructor Create( aPipe : THandle; aOwnsHandle : Boolean); reintroduce;
+    constructor Create( aPipe : THandle; aOwnsHandle : Boolean;
+                        const aTimeOut : DWORD = DEFAULT_THRIFT_PIPE_TIMEOUT); reintroduce;
   end;
 
 
@@ -150,17 +163,20 @@
 
 
   TPipeServerTransportBase = class( TServerTransportImpl)
-  protected
-    FStopServer   : Boolean;
+  strict protected
+    FStopServer : TEvent;
     procedure InternalClose; virtual; abstract;
+    function QueryStopServer : Boolean;
   public
+    constructor Create;
+    destructor Destroy;  override;
     procedure Listen; override;
     procedure Close; override;
   end;
 
 
   TAnonymousPipeServerTransportImpl = class( TPipeServerTransportBase, IAnonymousPipeServerTransport)
-  private
+  strict private
     FBufSize      : DWORD;
 
     // Server side anonymous pipe handles
@@ -190,7 +206,7 @@
 
 
   TNamedPipeServerTransportImpl = class( TPipeServerTransportBase, INamedPipeServerTransport)
-  private
+  strict private
     FPipeName     : string;
     FMaxConns     : DWORD;
     FBufSize      : DWORD;
@@ -199,7 +215,7 @@
     FConnected    : Boolean;
 
 
-  protected
+  strict protected
     function Accept(const fnAccepting: TProc): ITransport; override;
     function CreateNamedPipe : THandle;
     function CreateTransportInstance : ITransport;
@@ -243,11 +259,14 @@
 { TPipeStreamBase }
 
 
-constructor TPipeStreamBase.Create( const aTimeOut : DWORD = DEFAULT_THRIFT_PIPE_TIMEOUT);
+constructor TPipeStreamBase.Create( aEnableOverlapped : Boolean;
+                                    const aTimeOut : DWORD = DEFAULT_THRIFT_PIPE_TIMEOUT);
 begin
   inherited Create;
-  FPipe    := INVALID_HANDLE_VALUE;
-  FTimeout := aTimeOut;
+  ASSERT( aTimeout > 0);
+  FPipe       := INVALID_HANDLE_VALUE;
+  FTimeout    := aTimeOut;
+  FOverlapped := aEnableOverlapped;
 end;
 
 
@@ -280,6 +299,22 @@
 
 
 procedure TPipeStreamBase.Write(const buffer: TBytes; offset, count: Integer);
+begin
+  if FOverlapped
+  then WriteOverlapped( buffer, offset, count)
+  else WriteDirect( buffer, offset, count);
+end;
+
+
+function TPipeStreamBase.Read( var buffer: TBytes; offset, count: Integer): Integer;
+begin
+  if FOverlapped
+  then result := ReadOverlapped( buffer, offset, count)
+  else result := ReadDirect( buffer, offset, count);
+end;
+
+
+procedure TPipeStreamBase.WriteDirect(const buffer: TBytes; offset, count: Integer);
 var cbWritten : DWORD;
 begin
   if not IsOpen
@@ -292,7 +327,7 @@
 end;
 
 
-function TPipeStreamBase.Read( var buffer: TBytes; offset, count: Integer): Integer;
+function TPipeStreamBase.ReadDirect( var buffer: TBytes; offset, count: Integer): Integer;
 var cbRead, dwErr  : DWORD;
     bytes, retries  : LongInt;
     bOk     : Boolean;
@@ -338,6 +373,84 @@
 end;
 
 
+procedure TPipeStreamBase.WriteOverlapped(const buffer: TBytes; offset, count: Integer);
+var cbWritten, dwWait, dwError : DWORD;
+    overlapped : IOverlappedHelper;
+begin
+  if not IsOpen
+  then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
+                                         'Called write on non-open pipe');
+
+  overlapped := TOverlappedHelperImpl.Create;
+
+  if not WriteFile( FPipe, buffer[offset], count, cbWritten, overlapped.OverlappedPtr)
+  then begin
+    dwError := GetLastError;
+    case dwError of
+      ERROR_IO_PENDING : begin
+        dwWait := overlapped.WaitFor(FTimeout);
+
+        if (dwWait = WAIT_TIMEOUT)
+        then raise TTransportException.Create( TTransportException.TExceptionType.TimedOut,
+                                               'Pipe write timed out');
+
+        if (dwWait <> WAIT_OBJECT_0)
+        or not GetOverlappedResult( FPipe, overlapped.Overlapped, cbWritten, TRUE)
+        then raise TTransportException.Create( TTransportException.TExceptionType.Unknown,
+                                               'Pipe write error');
+      end;
+
+    else
+      raise TTransportException.Create( TTransportException.TExceptionType.Unknown,
+                                        SysErrorMessage(dwError));
+    end;
+  end;
+
+  ASSERT( DWORD(count) = cbWritten);
+end;
+
+
+function TPipeStreamBase.ReadOverlapped( var buffer: TBytes; offset, count: Integer): Integer;
+var cbRead, dwWait, dwError  : DWORD;
+    bOk     : Boolean;
+    overlapped : IOverlappedHelper;
+begin
+  if not IsOpen
+  then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
+                                         'Called read on non-open pipe');
+
+  overlapped := TOverlappedHelperImpl.Create;
+
+  // read the data
+  bOk := ReadFile( FPipe, buffer[offset], count, cbRead, overlapped.OverlappedPtr);
+  if not bOk then begin
+    dwError := GetLastError;
+    case dwError of
+      ERROR_IO_PENDING : begin
+        dwWait := overlapped.WaitFor(FTimeout);
+
+        if (dwWait = WAIT_TIMEOUT)
+        then raise TTransportException.Create( TTransportException.TExceptionType.TimedOut,
+                                               'Pipe read timed out');
+
+        if (dwWait <> WAIT_OBJECT_0)
+        or not GetOverlappedResult( FPipe, overlapped.Overlapped, cbRead, TRUE)
+        then raise TTransportException.Create( TTransportException.TExceptionType.Unknown,
+                                               'Pipe read error');
+      end;
+
+    else
+      raise TTransportException.Create( TTransportException.TExceptionType.Unknown,
+                                        SysErrorMessage(dwError));
+    end;
+  end;
+
+  ASSERT( cbRead > 0);  // see TTransportImpl.ReadAll()
+  ASSERT( cbRead = DWORD(count));
+  result := cbRead;
+end;
+
+
 function TPipeStreamBase.ToArray: TBytes;
 var bytes : LongInt;
 begin
@@ -357,11 +470,13 @@
 { TNamedPipeStreamImpl }
 
 
-constructor TNamedPipeStreamImpl.Create( const aPipeName : string; const aShareMode: DWORD;
+constructor TNamedPipeStreamImpl.Create( const aPipeName : string;
+                                         const aEnableOverlapped : Boolean;
+                                         const aShareMode: DWORD;
                                          const aSecurityAttributes: PSecurityAttributes;
                                          const aTimeOut : DWORD);
 begin
-  inherited Create( aTimeout);
+  inherited Create( aEnableOverlapped, aTimeout);
 
   FPipeName        := aPipeName;
   FShareMode       := aShareMode;
@@ -374,7 +489,6 @@
 
 procedure TNamedPipeStreamImpl.Open;
 var hPipe    : THandle;
-    dwMode   : DWORD;
 begin
   if IsOpen then Exit;
 
@@ -389,21 +503,13 @@
                        FShareMode,        // sharing
                        FSecurityAttribs,  // security attributes
                        OPEN_EXISTING,     // opens existing pipe
-                       0,                 // default attributes
+                       FILE_FLAG_OVERLAPPED or FILE_FLAG_WRITE_THROUGH, // async+fast, please
                        0);                // no template file
 
   if hPipe = INVALID_HANDLE_VALUE
   then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
                                          'Unable to open pipe, '+SysErrorMessage(GetLastError));
 
-  // pipe connected; change to message-read mode.
-  dwMode := PIPE_READMODE_MESSAGE;
-  if not SetNamedPipeHandleState( hPipe, dwMode, nil, nil) then begin
-    Close;
-    raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
-                                      'SetNamedPipeHandleState failed');
-  end;
-
   // everything fine
   FPipe := hPipe;
 end;
@@ -412,9 +518,11 @@
 { THandlePipeStreamImpl }
 
 
-constructor THandlePipeStreamImpl.Create( const aPipeHandle : THandle; aOwnsHandle : Boolean);
+constructor THandlePipeStreamImpl.Create( const aPipeHandle : THandle;
+                                          const aOwnsHandle, aEnableOverlapped : Boolean;
+                                          const aTimeOut : DWORD);
 begin
-  inherited Create( DEFAULT_THRIFT_PIPE_TIMEOUT);
+  inherited Create( aEnableOverlapped, aTimeOut);
 
   if aOwnsHandle
   then FSrcHandle := aPipeHandle
@@ -474,16 +582,17 @@
 // Named pipe constructor
 begin
   inherited Create( nil, nil);
-  FInputStream  := TNamedPipeStreamImpl.Create( aPipeName, aShareMode, aSecurityAttributes, aTimeOut);
+  FInputStream  := TNamedPipeStreamImpl.Create( aPipeName, TRUE, aShareMode, aSecurityAttributes, aTimeOut);
   FOutputStream := FInputStream;  // true for named pipes
 end;
 
 
-constructor TNamedPipeTransportClientEndImpl.Create( aPipe : THandle; aOwnsHandle : Boolean);
+constructor TNamedPipeTransportClientEndImpl.Create( aPipe : THandle; aOwnsHandle : Boolean;
+                                                     const aTimeOut : DWORD);
 // Named pipe constructor
 begin
   inherited Create( nil, nil);
-  FInputStream  := THandlePipeStreamImpl.Create( aPipe, aOwnsHandle);
+  FInputStream  := THandlePipeStreamImpl.Create( aPipe, TRUE, aOwnsHandle, aTimeOut);
   FOutputStream := FInputStream;  // true for named pipes
 end;
 
@@ -491,11 +600,12 @@
 { TNamedPipeTransportServerEndImpl }
 
 
-constructor TNamedPipeTransportServerEndImpl.Create( aPipe : THandle; aOwnsHandle : Boolean);
+constructor TNamedPipeTransportServerEndImpl.Create( aPipe : THandle; aOwnsHandle : Boolean;
+                                                     const aTimeOut : DWORD);
 // Named pipe constructor
 begin
   FHandle := DuplicatePipeHandle( aPipe);
-  inherited Create( aPipe, aOwnsHandle);
+  inherited Create( aPipe, aOwnsHandle, aTimeOut);
 end;
 
 
@@ -516,23 +626,48 @@
 // Anonymous pipe constructor
 begin
   inherited Create( nil, nil);
-  FInputStream  := THandlePipeStreamImpl.Create( aPipeRead, aOwnsHandles);
-  FOutputStream := THandlePipeStreamImpl.Create( aPipeWrite, aOwnsHandles);
+  // overlapped is not supported with AnonPipes, see MSDN
+  FInputStream  := THandlePipeStreamImpl.Create( aPipeRead, aOwnsHandles, FALSE);
+  FOutputStream := THandlePipeStreamImpl.Create( aPipeWrite, aOwnsHandles, FALSE);
 end;
 
 
 { TPipeServerTransportBase }
 
 
+constructor TPipeServerTransportBase.Create;
+begin
+  inherited Create;
+  FStopServer := TEvent.Create(nil,TRUE,FALSE,'');  // manual reset
+end;
+
+
+destructor TPipeServerTransportBase.Destroy;
+begin
+  try
+    FreeAndNil( FStopServer);
+  finally
+    inherited Destroy;
+  end;
+end;
+
+
+function TPipeServerTransportBase.QueryStopServer : Boolean;
+begin
+  result := (FStopServer = nil)
+         or (FStopServer.WaitFor(0) <> wrTimeout);
+end;
+
+
 procedure TPipeServerTransportBase.Listen;
 begin
-  FStopServer := FALSE;
+  FStopServer.ResetEvent;
 end;
 
 
 procedure TPipeServerTransportBase.Close;
 begin
-  FStopServer := TRUE;
+  FStopServer.SetEvent;
   InternalClose;
 end;
 
@@ -660,6 +795,7 @@
 // Named Pipe CTOR
 begin
   inherited Create;
+  ASSERT( aTimeout > 0);
   FPipeName  := aPipename;
   FBufsize   := aBufSize;
   FMaxConns  := Max( 1, Min( PIPE_UNLIMITED_INSTANCES, aMaxConns));
@@ -674,61 +810,54 @@
 
 function TNamedPipeServerTransportImpl.Accept(const fnAccepting: TProc): ITransport;
 var dwError, dwWait, dwDummy : DWORD;
-    overlapped : TOverlapped;
-    event      : TEvent;
+    overlapped : IOverlappedHelper;
+    handles : array[0..1] of THandle;
 begin
-  FillChar( overlapped, SizeOf(overlapped), 0);
-  event := TEvent.Create( nil, TRUE, FALSE, '');  // always ManualReset, see MSDN
-  try
-    overlapped.hEvent := event.Handle;
+  overlapped := TOverlappedHelperImpl.Create;
 
-    ASSERT( not FConnected);
-    while not FConnected do begin
-      InternalClose;
-      if FStopServer then Abort;
-      CreateNamedPipe;
+  ASSERT( not FConnected);
+  while not FConnected do begin
+    InternalClose;
+    if QueryStopServer then Abort;
+    CreateNamedPipe;
 
-      if Assigned(fnAccepting)
-      then fnAccepting();
+    if Assigned(fnAccepting)
+    then fnAccepting();
 
-      // Wait for the client to connect; if it succeeds, the
-      // function returns a nonzero value. If the function returns
-      // zero, GetLastError should return ERROR_PIPE_CONNECTED.
-      if ConnectNamedPipe( Handle, @overlapped) then begin
-        FConnected := TRUE;
-        Break;
-      end;
-
-      // ConnectNamedPipe() returns FALSE for OverlappedIO, even if connected.
-      // We have to check GetLastError() explicitly to find out
-      dwError := GetLastError;
-      case dwError of
-        ERROR_PIPE_CONNECTED : begin
-          FConnected := not FStopServer;  // special case: pipe immediately connected
-        end;
-
-        ERROR_IO_PENDING : begin
-          repeat
-            dwWait := WaitForSingleObject( overlapped.hEvent, DEFAULT_THRIFT_PIPE_TIMEOUT);
-          until (dwWait <> WAIT_TIMEOUT) or FStopServer;
-          FConnected := (dwWait = WAIT_OBJECT_0)
-                    and GetOverlappedResult( Handle, overlapped, dwDummy, TRUE)
-                    and not FStopServer;
-        end;
-
-      else
-        InternalClose;
-        raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
-                                          'Client connection failed');
-      end;
+    // Wait for the client to connect; if it succeeds, the
+    // function returns a nonzero value. If the function returns
+    // zero, GetLastError should return ERROR_PIPE_CONNECTED.
+    if ConnectNamedPipe( Handle, overlapped.OverlappedPtr) then begin
+      FConnected := TRUE;
+      Break;
     end;
 
-    // create the transport impl
-    result := CreateTransportInstance;
+    // ConnectNamedPipe() returns FALSE for OverlappedIO, even if connected.
+    // We have to check GetLastError() explicitly to find out
+    dwError := GetLastError;
+    case dwError of
+      ERROR_PIPE_CONNECTED : begin
+        FConnected := not QueryStopServer;  // special case: pipe immediately connected
+      end;
 
-  finally
-    event.Free;
+      ERROR_IO_PENDING : begin
+        handles[0] := overlapped.WaitHandle;
+        handles[1] := FStopServer.Handle;
+        dwWait := WaitForMultipleObjects( 2, @handles, FALSE, FTimeout);
+        FConnected := (dwWait = WAIT_OBJECT_0)
+                  and GetOverlappedResult( Handle, overlapped.Overlapped, dwDummy, TRUE)
+                  and not QueryStopServer;
+      end;
+
+    else
+      InternalClose;
+      raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
+                                        'Client connection failed');
+    end;
   end;
+
+  // create the transport impl
+  result := CreateTransportInstance;
 end;
 
 
@@ -739,7 +868,7 @@
   hPipe := THandle( InterlockedExchangePointer( Pointer(FHandle), Pointer(INVALID_HANDLE_VALUE)));
   try
     FConnected := FALSE;
-    result := TNamedPipeTransportServerEndImpl.Create( hPipe, TRUE);
+    result := TNamedPipeTransportServerEndImpl.Create( hPipe, TRUE, FTimeout);
   except
     ClosePipeHandle(hPipe);
     raise;
@@ -819,8 +948,8 @@
     result := Windows.CreateNamedPipe( PChar( FPipeName),        // pipe name
                                        PIPE_ACCESS_DUPLEX or     // read/write access
                                        FILE_FLAG_OVERLAPPED,     // async mode
-                                       PIPE_TYPE_MESSAGE or      // message type pipe
-                                       PIPE_READMODE_MESSAGE,    // message-read mode
+                                       PIPE_TYPE_BYTE or         // byte type pipe
+                                       PIPE_READMODE_BYTE,       // byte read mode
                                        FMaxConns,                // max. instances
                                        FBufSize,                 // output buffer size
                                        FBufSize,                 // input buffer size