THRIFT-1890 C++: Make named pipes server work asynchronously
Patch: Jens Geyer & Ben Craig
diff --git a/lib/cpp/src/thrift/transport/TPipeServer.cpp b/lib/cpp/src/thrift/transport/TPipeServer.cpp
index f4d4704..b11d22f 100644
--- a/lib/cpp/src/thrift/transport/TPipeServer.cpp
+++ b/lib/cpp/src/thrift/transport/TPipeServer.cpp
@@ -42,17 +42,22 @@
pipename_(pipename),
bufsize_(bufsize),
Pipe_(INVALID_HANDLE_VALUE),
+ wakeup(INVALID_HANDLE_VALUE),
maxconns_(TPIPE_SERVER_MAX_CONNS_DEFAULT),
- isAnonymous(false)
+ isAnonymous(false),
+ stop_(false)
{
setPipename(pipename);
+ createWakeupEvent();
}
TPipeServer::TPipeServer(const std::string &pipename, uint32_t bufsize, uint32_t maxconnections) :
pipename_(pipename),
bufsize_(bufsize),
Pipe_(INVALID_HANDLE_VALUE),
- isAnonymous(false)
+ wakeup(INVALID_HANDLE_VALUE),
+ isAnonymous(false),
+ stop_(false)
{ //Restrict maxconns_ to 1-PIPE_UNLIMITED_INSTANCES
if(maxconnections == 0)
maxconns_ = 1;
@@ -62,24 +67,30 @@
maxconns_ = maxconnections;
setPipename(pipename);
+ createWakeupEvent();
}
TPipeServer::TPipeServer(const std::string &pipename) :
pipename_(pipename),
bufsize_(1024),
Pipe_(INVALID_HANDLE_VALUE),
+ wakeup(INVALID_HANDLE_VALUE),
maxconns_(TPIPE_SERVER_MAX_CONNS_DEFAULT),
- isAnonymous(false)
+ isAnonymous(false),
+ stop_(false)
{
setPipename(pipename);
+ createWakeupEvent();
}
TPipeServer::TPipeServer(int bufsize) :
pipename_(""),
bufsize_(bufsize),
Pipe_(INVALID_HANDLE_VALUE),
+ wakeup(INVALID_HANDLE_VALUE),
maxconns_(1),
- isAnonymous(true)
+ isAnonymous(true),
+ stop_(false)
{
//The anonymous pipe needs to be created first so that the server can
//pass the handles on to the client before the serve (acceptImpl)
@@ -88,24 +99,30 @@
GlobalOutput.perror("TPipeServer Create(Anon)Pipe failed, GLE=", GetLastError());
throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer Create(Anon)Pipe failed");
}
+ createWakeupEvent();
}
TPipeServer::TPipeServer() :
pipename_(""),
bufsize_(1024),
Pipe_(INVALID_HANDLE_VALUE),
+ wakeup(INVALID_HANDLE_VALUE),
maxconns_(1),
- isAnonymous(true)
+ isAnonymous(true),
+ stop_(false)
{
if (!TCreateAnonPipe()) {
GlobalOutput.perror("TPipeServer Create(Anon)Pipe failed, GLE=", GetLastError());
throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer Create(Anon)Pipe failed");
}
+ createWakeupEvent();
}
//---- Destructor ----
TPipeServer::~TPipeServer() {
close();
+ CloseHandle( wakeup);
+ wakeup = INVALID_HANDLE_VALUE;
}
//---------------------------------------------------------
@@ -115,6 +132,8 @@
shared_ptr<TTransport> TPipeServer::acceptImpl() {
shared_ptr<TPipe> client;
+ stop_ = FALSE;
+
if(isAnonymous)
{ //Anonymous Pipe
//This 0-byte read serves merely as a blocking call.
@@ -131,37 +150,79 @@
GlobalOutput.perror("TPipeServer unable to initiate pipe comms, GLE=", GetLastError());
throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer unable to initiate pipe comms");
}
- client.reset(new TPipe(Pipe_, PipeW_));
+ client.reset(new TPipe(Pipe_, PipeW_));
}
else
{ //Named Pipe
- int ConnectRet;
- while (true)
- {
- if (!TCreateNamedPipe()) {
- GlobalOutput.perror("TPipeServer CreateNamedPipe failed, GLE=", GetLastError());
- throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer CreateNamedPipe failed");
- }
-
- // Wait for the client to connect; if it succeeds, the
- // function returns a nonzero value. If the function returns
- // zero, GetLastError should return ERROR_PIPE_CONNECTED.
- ConnectRet = ConnectNamedPipe(Pipe_, NULL) ?
- TRUE : (GetLastError() == ERROR_PIPE_CONNECTED);
-
- if (ConnectRet == TRUE)
- {
- GlobalOutput.printf("Client connected.");
- break;
- }
- else
- {
- close();
- GlobalOutput.perror("TPipeServer ConnectNamedPipe GLE=", GetLastError());
- throw TTransportException(TTransportException::NOT_OPEN, "TPipeServer: client connection failed");
- }
+ if (!TCreateNamedPipe()) {
+ GlobalOutput.perror("TPipeServer CreateNamedPipe failed, GLE=", GetLastError());
+ throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer CreateNamedPipe failed");
}
- client.reset(new TPipe(Pipe_));
+
+ struct TEventCleaner {
+ HANDLE hEvent;
+ ~TEventCleaner() {CloseHandle(hEvent);}
+ };
+
+ OVERLAPPED overlapped;
+ memset( &overlapped, 0, sizeof(overlapped));
+ overlapped.hEvent = CreateEvent( NULL, TRUE, FALSE, NULL);
+ {
+ TEventCleaner cleaner = {overlapped.hEvent};
+ while( ! stop_)
+ {
+ // Wait for the client to connect; if it succeeds, the
+ // function returns a nonzero value. If the function returns
+ // zero, GetLastError should return ERROR_PIPE_CONNECTED.
+ if( ConnectNamedPipe(Pipe_, &overlapped))
+ {
+ GlobalOutput.printf("Client connected.");
+ client.reset(new TPipe(Pipe_));
+ return client;
+ }
+
+ DWORD dwErr = GetLastError();
+ HANDLE events[2] = {overlapped.hEvent, wakeup};
+ switch( dwErr)
+ {
+ case ERROR_PIPE_CONNECTED:
+ GlobalOutput.printf("Client connected.");
+ client.reset(new TPipe(Pipe_));
+ return client;
+
+ case ERROR_IO_PENDING:
+ DWORD dwWait, dwDummy;
+ dwWait = WaitForMultipleObjects( 2, events, FALSE, 3000);
+ switch(dwWait)
+ {
+ case WAIT_OBJECT_0:
+ if(GetOverlappedResult(Pipe_, &overlapped, &dwDummy, TRUE))
+ {
+ GlobalOutput.printf("Client connected.");
+ client.reset(new TPipe(Pipe_));
+ return client;
+ }
+ break;
+ case WAIT_OBJECT_0 + 1:
+ stop_ = TRUE;
+ break;
+ default:
+ break;
+ }
+ break;
+
+ default:
+ break;
+ }
+
+ CancelIo(Pipe_);
+ DisconnectNamedPipe(Pipe_);
+ }
+
+ close();
+ GlobalOutput.perror("TPipeServer ConnectNamedPipe GLE=", GetLastError());
+ throw TTransportException(TTransportException::NOT_OPEN, "TPipeServer: client connection failed");
+ }
}
return client;
@@ -169,7 +230,9 @@
void TPipeServer::interrupt() {
if(Pipe_ != INVALID_HANDLE_VALUE) {
+ stop_ = TRUE;
CancelIo(Pipe_);
+ SetEvent(wakeup);
}
}
@@ -229,7 +292,8 @@
// Create an instance of the named pipe
HANDLE hPipe_ = CreateNamedPipe(
pipename_.c_str(), // pipe name
- PIPE_ACCESS_DUPLEX, // read/write access
+ PIPE_ACCESS_DUPLEX | // read/write access
+ FILE_FLAG_OVERLAPPED, // async mode
PIPE_TYPE_MESSAGE | // message type pipe
PIPE_READMODE_MESSAGE, // message-read mode
maxconns_, // max. instances
@@ -281,6 +345,11 @@
return true;
}
+void TPipeServer::createWakeupEvent() {
+ wakeup = CreateEvent( NULL, TRUE, FALSE, NULL);
+}
+
+
//---------------------------------------------------------
// Accessors
//---------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/TPipeServer.h b/lib/cpp/src/thrift/transport/TPipeServer.h
index 624a30a..4c211a0 100755
--- a/lib/cpp/src/thrift/transport/TPipeServer.h
+++ b/lib/cpp/src/thrift/transport/TPipeServer.h
@@ -56,6 +56,7 @@
bool TCreateNamedPipe();
bool TCreateAnonPipe();
+ void createWakeupEvent();
public:
//Accessors
@@ -77,8 +78,10 @@
uint32_t maxconns_;
HANDLE PipeW_; //Anonymous Pipe (W)
HANDLE ClientAnonRead, ClientAnonWrite; //Client side anonymous pipe handles
+ HANDLE wakeup; // wake up event
//? Do we need duplicates to send to client?
bool isAnonymous;
+ bool stop_; // stop flag
};
#else //_WIN32
//*NIX named pipe implementation uses domain socket