THRIFT-1890 C++: Make named pipes server work asynchronously

Patch: Jens Geyer & Ben Craig
diff --git a/lib/cpp/src/thrift/transport/TPipeServer.cpp b/lib/cpp/src/thrift/transport/TPipeServer.cpp
index f4d4704..b11d22f 100644
--- a/lib/cpp/src/thrift/transport/TPipeServer.cpp
+++ b/lib/cpp/src/thrift/transport/TPipeServer.cpp
@@ -42,17 +42,22 @@
   pipename_(pipename),
   bufsize_(bufsize),
   Pipe_(INVALID_HANDLE_VALUE),
+  wakeup(INVALID_HANDLE_VALUE),
   maxconns_(TPIPE_SERVER_MAX_CONNS_DEFAULT),
-  isAnonymous(false)
+  isAnonymous(false),
+  stop_(false)
  {
     setPipename(pipename);
+    createWakeupEvent();
  }
 
 TPipeServer::TPipeServer(const std::string &pipename, uint32_t bufsize, uint32_t maxconnections) :
   pipename_(pipename),
   bufsize_(bufsize),
   Pipe_(INVALID_HANDLE_VALUE),
-  isAnonymous(false)
+  wakeup(INVALID_HANDLE_VALUE),
+  isAnonymous(false),
+  stop_(false)
  {  //Restrict maxconns_ to 1-PIPE_UNLIMITED_INSTANCES
     if(maxconnections == 0)
       maxconns_ = 1;
@@ -62,24 +67,30 @@
       maxconns_ = maxconnections;
 
     setPipename(pipename);
+    createWakeupEvent();
  }
 
 TPipeServer::TPipeServer(const std::string &pipename) :
   pipename_(pipename),
   bufsize_(1024),
   Pipe_(INVALID_HANDLE_VALUE),
+  wakeup(INVALID_HANDLE_VALUE),
   maxconns_(TPIPE_SERVER_MAX_CONNS_DEFAULT),
-  isAnonymous(false)
+  isAnonymous(false),
+  stop_(false)
  {
     setPipename(pipename);
+    createWakeupEvent();
  }
 
 TPipeServer::TPipeServer(int bufsize) :
   pipename_(""),
   bufsize_(bufsize),
   Pipe_(INVALID_HANDLE_VALUE),
+  wakeup(INVALID_HANDLE_VALUE),
   maxconns_(1),
-  isAnonymous(true)
+  isAnonymous(true),
+  stop_(false)
  {
   //The anonymous pipe needs to be created first so that the server can
   //pass the handles on to the client before the serve (acceptImpl)
@@ -88,24 +99,30 @@
     GlobalOutput.perror("TPipeServer Create(Anon)Pipe failed, GLE=", GetLastError());
     throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer Create(Anon)Pipe failed");
   }
+  createWakeupEvent();
 }
 
 TPipeServer::TPipeServer() :
   pipename_(""),
   bufsize_(1024),
   Pipe_(INVALID_HANDLE_VALUE),
+  wakeup(INVALID_HANDLE_VALUE),
   maxconns_(1),
-  isAnonymous(true)
+  isAnonymous(true),
+  stop_(false)
 {
   if (!TCreateAnonPipe()) {
     GlobalOutput.perror("TPipeServer Create(Anon)Pipe failed, GLE=", GetLastError());
     throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer Create(Anon)Pipe failed");
   }
+  createWakeupEvent();
 }
 
 //---- Destructor ----
 TPipeServer::~TPipeServer() {
   close();
+  CloseHandle( wakeup);
+  wakeup = INVALID_HANDLE_VALUE;
 }
 
 //---------------------------------------------------------
@@ -115,6 +132,8 @@
 shared_ptr<TTransport> TPipeServer::acceptImpl() {
   shared_ptr<TPipe> client;
 
+  stop_ = FALSE;
+
   if(isAnonymous)
   { //Anonymous Pipe
     //This 0-byte read serves merely as a blocking call.
@@ -131,37 +150,79 @@
       GlobalOutput.perror("TPipeServer unable to initiate pipe comms, GLE=", GetLastError());
       throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer unable to initiate pipe comms");
     }
-	client.reset(new TPipe(Pipe_, PipeW_));
+    client.reset(new TPipe(Pipe_, PipeW_));
   }
   else
   { //Named Pipe
-    int ConnectRet;
-    while (true)
-    {
-      if (!TCreateNamedPipe()) {
-        GlobalOutput.perror("TPipeServer CreateNamedPipe failed, GLE=", GetLastError());
-        throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer CreateNamedPipe failed");
-      }
-
-      // Wait for the client to connect; if it succeeds, the
-      // function returns a nonzero value. If the function returns
-      // zero, GetLastError should return ERROR_PIPE_CONNECTED.
-      ConnectRet = ConnectNamedPipe(Pipe_, NULL) ?
-                    TRUE : (GetLastError() == ERROR_PIPE_CONNECTED);
-
-      if (ConnectRet == TRUE)
-      {
-        GlobalOutput.printf("Client connected.");
-        break;
-      }
-      else
-      {
-        close();
-        GlobalOutput.perror("TPipeServer ConnectNamedPipe GLE=", GetLastError());
-        throw TTransportException(TTransportException::NOT_OPEN, "TPipeServer: client connection failed");
-      }
+    if (!TCreateNamedPipe()) {
+      GlobalOutput.perror("TPipeServer CreateNamedPipe failed, GLE=", GetLastError());
+      throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer CreateNamedPipe failed");
     }
-	client.reset(new TPipe(Pipe_));
+
+    struct TEventCleaner {
+      HANDLE hEvent;
+      ~TEventCleaner() {CloseHandle(hEvent);}
+    };
+
+    OVERLAPPED overlapped;
+    memset( &overlapped, 0, sizeof(overlapped));
+    overlapped.hEvent = CreateEvent( NULL, TRUE, FALSE, NULL);
+    {
+      TEventCleaner cleaner = {overlapped.hEvent};
+      while( ! stop_)
+      {
+        // Wait for the client to connect; if it succeeds, the
+        // function returns a nonzero value. If the function returns
+        // zero, GetLastError should return ERROR_PIPE_CONNECTED.
+        if( ConnectNamedPipe(Pipe_, &overlapped))
+        {
+          GlobalOutput.printf("Client connected.");
+          client.reset(new TPipe(Pipe_));
+          return client;
+        }
+
+        DWORD dwErr = GetLastError();
+        HANDLE events[2] = {overlapped.hEvent, wakeup};
+        switch( dwErr)
+        {
+        case ERROR_PIPE_CONNECTED:
+          GlobalOutput.printf("Client connected.");
+          client.reset(new TPipe(Pipe_));
+          return client;
+
+        case ERROR_IO_PENDING:
+          DWORD dwWait, dwDummy;
+          dwWait = WaitForMultipleObjects( 2, events, FALSE, 3000);
+          switch(dwWait)
+          {
+          case WAIT_OBJECT_0:
+            if(GetOverlappedResult(Pipe_, &overlapped, &dwDummy, TRUE))
+            {
+              GlobalOutput.printf("Client connected.");
+              client.reset(new TPipe(Pipe_));
+              return client;
+            }
+            break;
+          case WAIT_OBJECT_0 + 1:
+            stop_ = TRUE;
+            break;
+          default:
+            break;
+          }
+          break;
+
+        default:
+          break;
+        }
+
+        CancelIo(Pipe_);
+        DisconnectNamedPipe(Pipe_);
+      }
+
+      close();
+      GlobalOutput.perror("TPipeServer ConnectNamedPipe GLE=", GetLastError());
+      throw TTransportException(TTransportException::NOT_OPEN, "TPipeServer: client connection failed");
+    }
   }
 
   return client;
@@ -169,7 +230,9 @@
 
 void TPipeServer::interrupt() {
   if(Pipe_ != INVALID_HANDLE_VALUE) {
+    stop_ = TRUE;
     CancelIo(Pipe_);
+    SetEvent(wakeup);
   }
 }
 
@@ -229,7 +292,8 @@
   // Create an instance of the named pipe
   HANDLE hPipe_ = CreateNamedPipe(
         pipename_.c_str(),        // pipe name
-        PIPE_ACCESS_DUPLEX,       // read/write access
+        PIPE_ACCESS_DUPLEX |      // read/write access
+        FILE_FLAG_OVERLAPPED,     // async mode
         PIPE_TYPE_MESSAGE |       // message type pipe
         PIPE_READMODE_MESSAGE,    // message-read mode
         maxconns_,                // max. instances
@@ -281,6 +345,11 @@
   return true;
 }
 
+void TPipeServer::createWakeupEvent() {
+  wakeup = CreateEvent( NULL, TRUE, FALSE, NULL);
+}
+
+
 //---------------------------------------------------------
 // Accessors
 //---------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/TPipeServer.h b/lib/cpp/src/thrift/transport/TPipeServer.h
index 624a30a..4c211a0 100755
--- a/lib/cpp/src/thrift/transport/TPipeServer.h
+++ b/lib/cpp/src/thrift/transport/TPipeServer.h
@@ -56,6 +56,7 @@
 
   bool TCreateNamedPipe();
   bool TCreateAnonPipe();
+  void createWakeupEvent();
 
  public:
   //Accessors
@@ -77,8 +78,10 @@
   uint32_t maxconns_;
   HANDLE PipeW_; //Anonymous Pipe (W)
   HANDLE ClientAnonRead, ClientAnonWrite; //Client side anonymous pipe handles
+  HANDLE wakeup;  // wake up event
   //? Do we need duplicates to send to client?
   bool isAnonymous;
+  bool stop_; // stop flag
 };
 #else //_WIN32
 //*NIX named pipe implementation uses domain socket