Proper shutdown functionality for Thrift servers

Reviewed By: karl


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665038 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/ThreadManager.h b/lib/cpp/src/concurrency/ThreadManager.h
index f7c4b3c..cc13665 100644
--- a/lib/cpp/src/concurrency/ThreadManager.h
+++ b/lib/cpp/src/concurrency/ThreadManager.h
@@ -68,6 +68,8 @@
    * block until all the workers have finished their work. At that point
    * the ThreadManager will transition into the STOPPED state.
    */
+  virtual void join() = 0;
+
   enum STATE {
     UNINITIALIZED,
     STARTING,
diff --git a/lib/cpp/src/server/TServer.h b/lib/cpp/src/server/TServer.h
index a9bcb26..2936e42 100644
--- a/lib/cpp/src/server/TServer.h
+++ b/lib/cpp/src/server/TServer.h
@@ -31,6 +31,8 @@
 
   virtual void serve() = 0;
 
+  virtual void stop() {}
+
   // Allows running the server as a Runnable thread
   virtual void run() {
     serve();
diff --git a/lib/cpp/src/server/TSimpleServer.cpp b/lib/cpp/src/server/TSimpleServer.cpp
index eb936f5..8657fab 100644
--- a/lib/cpp/src/server/TSimpleServer.cpp
+++ b/lib/cpp/src/server/TSimpleServer.cpp
@@ -34,7 +34,7 @@
   }
 
   // Fetch client from server
-  while (true) {
+  while (!stop_) {
     try {
       client = serverTransport_->accept();
       inputTransport = inputTransportFactory_->getTransport(client);
@@ -77,7 +77,14 @@
     }
   }
 
-  // TODO(mcslee): Could this be a timeout case? Or always the real thing?
+  if (stop_) {
+    try {
+      serverTransport_->close();
+    } catch (TTransportException &ttx) {
+      cerr << "TServerTransport failed on close: " << ttx.what() << endl;
+    }
+    stop_ = false;
+  }
 }
 
 }}} // facebook::thrift::server
diff --git a/lib/cpp/src/server/TSimpleServer.h b/lib/cpp/src/server/TSimpleServer.h
index 05befbf..0bb0365 100644
--- a/lib/cpp/src/server/TSimpleServer.h
+++ b/lib/cpp/src/server/TSimpleServer.h
@@ -26,7 +26,8 @@
                 shared_ptr<TServerTransport> serverTransport,
                 shared_ptr<TTransportFactory> transportFactory,
                 shared_ptr<TProtocolFactory> protocolFactory) :
-    TServer(processor, serverTransport, transportFactory, protocolFactory) {}
+    TServer(processor, serverTransport, transportFactory, protocolFactory),
+    stop_(false) {}
 
   TSimpleServer(shared_ptr<TProcessor> processor,
                 shared_ptr<TServerTransport> serverTransport,
@@ -36,12 +37,20 @@
                 shared_ptr<TProtocolFactory> outputProtocolFactory):
     TServer(processor, serverTransport, 
             inputTransportFactory, outputTransportFactory,
-            inputProtocolFactory, outputProtocolFactory) {}
+            inputProtocolFactory, outputProtocolFactory),
+    stop_(false) {}
     
   ~TSimpleServer() {}
 
   void serve();
 
+  void stop() {
+    stop_ = true;
+  }
+
+ protected:
+  bool stop_;
+
 };
 
 }}} // facebook::thrift::server
diff --git a/lib/cpp/src/server/TThreadPoolServer.cpp b/lib/cpp/src/server/TThreadPoolServer.cpp
index 1a9898a..32a0223 100644
--- a/lib/cpp/src/server/TThreadPoolServer.cpp
+++ b/lib/cpp/src/server/TThreadPoolServer.cpp
@@ -65,7 +65,8 @@
                                      shared_ptr<TProtocolFactory> protocolFactory,
                                      shared_ptr<ThreadManager> threadManager) :
   TServer(processor, serverTransport, transportFactory, protocolFactory), 
-  threadManager_(threadManager) {}
+  threadManager_(threadManager),
+  stop_(false) {}
 
 TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
                                      shared_ptr<TServerTransport> serverTransport,
@@ -76,13 +77,13 @@
                                      shared_ptr<ThreadManager> threadManager) :
   TServer(processor, serverTransport, inputTransportFactory, outputTransportFactory,
           inputProtocolFactory, outputProtocolFactory),
-  threadManager_(threadManager) {}
+  threadManager_(threadManager),
+  stop_(false) {}
 
 
 TThreadPoolServer::~TThreadPoolServer() {}
 
 void TThreadPoolServer::serve() {
-
   shared_ptr<TTransport> client;
   shared_ptr<TTransport> inputTransport;
   shared_ptr<TTransport> outputTransport;
@@ -97,7 +98,7 @@
     return;
   }
   
-  while (true) {   
+  while (!stop_) {
     try {
       // Fetch client from server
       client = serverTransport_->accept();
@@ -131,6 +132,18 @@
       break;
     }
   }
+
+  // If stopped manually, join the existing threads
+  if (stop_) {
+    try {
+      serverTransport_->close();
+      threadManager_->join();
+    } catch (TException &tx) {
+      cerr << "TThreadPoolServer: Exception shutting down: " << tx.what() << endl;
+    }
+  }
+  stop_ = false;
+
 }
 
 }}} // facebook::thrift::server
diff --git a/lib/cpp/src/server/TThreadPoolServer.h b/lib/cpp/src/server/TThreadPoolServer.h
index d227930..f6809fc 100644
--- a/lib/cpp/src/server/TThreadPoolServer.h
+++ b/lib/cpp/src/server/TThreadPoolServer.h
@@ -40,10 +40,14 @@
   virtual ~TThreadPoolServer();
 
   virtual void serve();
+  
+  virtual void stop() { stop_ = true; }
 
  protected:
 
   shared_ptr<ThreadManager> threadManager_;
+
+  volatile bool stop_;
   
 };