Add thread pool option to NonblockingServer

Summary: If you want requests processed outside of the I/O thread

Reviewed By: jake luciani, aditya

Test Plan: nb-main.cpp, in the test folder


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665132 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h
index e4f8346..0ce7ccb 100644
--- a/lib/cpp/src/server/TNonblockingServer.h
+++ b/lib/cpp/src/server/TNonblockingServer.h
@@ -10,6 +10,7 @@
 #include <Thrift.h>
 #include <server/TServer.h>
 #include <transport/TTransportUtils.h>
+#include <concurrency/ThreadManager.h>
 #include <stack>
 #include <event.h>
 
@@ -17,6 +18,8 @@
 
 using facebook::thrift::transport::TMemoryBuffer;
 using facebook::thrift::protocol::TProtocol;
+using facebook::thrift::concurrency::Runnable;
+using facebook::thrift::concurrency::ThreadManager;
 
 // Forward declaration of class
 class TConnection;
@@ -46,6 +49,12 @@
   // Whether to frame responses
   bool frameResponses_;
 
+  // For processing via thread pool, may be NULL
+  boost::shared_ptr<ThreadManager> threadManager_;
+
+  // Is thread pool processing?
+  bool threadPoolProcessing_;
+
   /**
    * This is a stack of all the objects that have been created but that
    * are NOT currently in use. When we close a connection, we place it on this
@@ -66,15 +75,18 @@
 
   TNonblockingServer(boost::shared_ptr<TProcessor> processor, 
                      boost::shared_ptr<TProtocolFactory> protocolFactory,
-                     int port) :
+                     int port,
+                     boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
     TServer(processor),
     serverSocket_(0),
     port_(port),
-    frameResponses_(true) {
+    frameResponses_(true),
+    threadManager_(threadManager) {
     setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
     setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
     setInputProtocolFactory(protocolFactory);
     setOutputProtocolFactory(protocolFactory);
+    setThreadManager(threadManager);
   }
 
   TNonblockingServer(boost::shared_ptr<TProcessor> processor,
@@ -82,19 +94,35 @@
                      boost::shared_ptr<TTransportFactory> outputTransportFactory,
                      boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
                      boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
-                     int port) :
+                     int port,
+                     boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
     TServer(processor),
     serverSocket_(0),
     port_(port),
-    frameResponses_(true) {
+    frameResponses_(true),
+    threadManager_(threadManager) {
     setInputTransportFactory(inputTransportFactory);
     setOutputTransportFactory(outputTransportFactory);
     setInputProtocolFactory(inputProtocolFactory);
     setOutputProtocolFactory(outputProtocolFactory);
+    setThreadManager(threadManager);
   }
         
   ~TNonblockingServer() {}
 
+  void setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
+    threadManager_ = threadManager;
+    threadPoolProcessing_ = (threadManager != NULL);
+  }
+
+  bool isThreadPoolProcessing() {
+    return threadPoolProcessing_;
+  }
+
+  void addTask(boost::shared_ptr<Runnable> task) {
+    threadManager_->add(task);
+  }
+
   void setFrameResponses(bool frameResponses) {
     frameResponses_ = frameResponses;
   }
@@ -133,6 +161,7 @@
   APP_INIT,
   APP_READ_FRAME_SIZE,
   APP_READ_REQUEST,
+  APP_WAIT_TASK,
   APP_SEND_FRAME_SIZE,
   APP_SEND_RESULT
 };
@@ -144,6 +173,8 @@
 class TConnection {
  private:
 
+  class Task;
+
   // Server handle
   TNonblockingServer* server_;
 
@@ -186,6 +217,12 @@
   // Frame size
   int32_t frameSize_;
 
+  // Task handle
+  int taskHandle_;
+
+  // Task event
+  struct event taskEvent_;
+
   // Transport to read from
   boost::shared_ptr<TMemoryBuffer> inputTransport_;
 
@@ -248,9 +285,19 @@
 
   // Handler wrapper
   static void eventHandler(int fd, short which, void* v) {
-    assert(fd = ((TConnection*)v)->socket_);
+    assert(fd == ((TConnection*)v)->socket_);
     ((TConnection*)v)->workSocket();
   }
+  
+  // Handler wrapper for task block
+  static void taskHandler(int fd, short which, void* v) {
+    assert(fd == ((TConnection*)v)->taskHandle_);
+    if (-1 == ::close(((TConnection*)v)->taskHandle_)) {
+      GlobalOutput("TConnection::taskHandler close handle failed, resource leak");
+    }
+    ((TConnection*)v)->transition();
+  }
+
 };
 
 }}} // facebook::thrift::server