Add thread pool option to NonblockingServer

Summary: If you want requests processed outside of the I/O thread

Reviewed By: jake luciani, aditya

Test Plan: nb-main.cpp, in the test folder


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665132 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp
index 6337806..5c4dc8c 100644
--- a/lib/cpp/src/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/server/TNonblockingServer.cpp
@@ -6,6 +6,7 @@
 
 #include "TNonblockingServer.h"
 
+#include <iostream>
 #include <sys/socket.h>
 #include <netinet/in.h>
 #include <netinet/tcp.h>
@@ -17,6 +18,50 @@
 
 using namespace facebook::thrift::protocol;
 using namespace facebook::thrift::transport;
+using namespace std;
+
+class TConnection::Task: public Runnable {
+ public:
+  Task(boost::shared_ptr<TProcessor> processor,
+       boost::shared_ptr<TProtocol> input,
+       boost::shared_ptr<TProtocol> output,
+       int taskHandle) :
+    processor_(processor),
+    input_(input),
+    output_(output),
+    taskHandle_(taskHandle) {}
+
+  void run() {
+    try {
+      while (processor_->process(input_, output_)) {
+        if (!input_->getTransport()->peek()) {
+          break;
+        }
+      }
+    } catch (TTransportException& ttx) {
+      cerr << "TThreadedServer client died: " << ttx.what() << endl;
+    } catch (TException& x) {
+      cerr << "TThreadedServer exception: " << x.what() << endl;
+    } catch (...) {
+      cerr << "TThreadedServer uncaught exception." << endl;
+    }
+    
+    // Signal completion back to the libevent thread via a socketpair
+    int8_t b = 0;
+    if (-1 == send(taskHandle_, &b, sizeof(int8_t), 0)) {
+      GlobalOutput("TNonblockingServer::Task: send");
+    }
+    if (-1 == ::close(taskHandle_)) {
+      GlobalOutput("TNonblockingServer::Task: close, possible resource leak");
+    }
+  }
+
+ private:
+  boost::shared_ptr<TProcessor> processor_;
+  boost::shared_ptr<TProtocol> input_;
+  boost::shared_ptr<TProtocol> output_;
+  int taskHandle_;
+};
 
 void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) {
   socket_ = socket;
@@ -34,6 +79,8 @@
   socketState_ = SOCKET_RECV;
   appState_ = APP_INIT;
   
+  taskHandle_ = -1;
+
   // Set flags, which also registers the event
   setFlags(eventFlags);
 
@@ -168,24 +215,59 @@
     // and get back some data from the dispatch function
     inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
     outputTransport_->resetBuffer();
+  
+    if (server_->isThreadPoolProcessing()) {
+      // We are setting up a Task to do this work and we will wait on it
+      int sv[2];
+      if (-1 == socketpair(AF_LOCAL, SOCK_STREAM, 0, sv)) {
+        GlobalOutput("TConnection::socketpair() failed");
+        // Now we will fall through to the APP_WAIT_TASK block with no response
+      } else {
+        // Create task and dispatch to the thread manager
+        boost::shared_ptr<Runnable> task =
+          boost::shared_ptr<Runnable>(new Task(server_->getProcessor(),
+                                               inputProtocol_,
+                                               outputProtocol_,
+                                               sv[1]));
+        appState_ = APP_WAIT_TASK;
+        event_set(&taskEvent_,
+                  taskHandle_ = sv[0],
+                  EV_READ,
+                  TConnection::taskHandler,
+                  this);
 
-    try {
-      // Invoke the processor
-      server_->getProcessor()->process(inputProtocol_, outputProtocol_);
-    } catch (TTransportException &ttx) {
-      fprintf(stderr, "TTransportException: Server::process() %s\n", ttx.what());
-      close();
-      return;
-    } catch (TException &x) {
-      fprintf(stderr, "TException: Server::process() %s\n", x.what());
-      close();     
-      return;
-    } catch (...) {
-      fprintf(stderr, "Server::process() unknown exception\n");
-      close();
-      return;
+        // Add the event and start up the server
+        if (-1 == event_add(&taskEvent_, 0)) {
+          GlobalOutput("TNonblockingServer::serve(): coult not event_add");
+          return;
+        }
+        server_->addTask(task);
+        return;
+      }
+    } else {
+      try {
+        // Invoke the processor
+        server_->getProcessor()->process(inputProtocol_, outputProtocol_);
+      } catch (TTransportException &ttx) {
+        fprintf(stderr, "TTransportException: Server::process() %s\n", ttx.what());
+        close();
+        return;
+      } catch (TException &x) {
+        fprintf(stderr, "TException: Server::process() %s\n", x.what());
+        close();     
+        return;
+      } catch (...) {
+        fprintf(stderr, "Server::process() unknown exception\n");
+        close();
+        return;
+      }
     }
 
+  case APP_WAIT_TASK:
+    // We have now finished processing a task and the result has been written
+    // into the outputTransport_, so we grab its contents and place them into
+    // the writeBuffer_ for actual writing by the libevent thread
+
     // Get the result of the operation
     outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
 
@@ -212,7 +294,7 @@
       setWrite();
 
       // Try to work the socket immediately
-      workSocket();
+      // workSocket();
 
       return;
     }
@@ -232,7 +314,7 @@
     appState_ = APP_SEND_RESULT;
 
     // Go to work on the socket right away, probably still writeable
-    workSocket();
+    // workSocket();
 
     return;
 
@@ -260,7 +342,7 @@
     setRead();
 
     // Try to work the socket right away
-    workSocket();
+    // workSocket();
 
     return;
 
@@ -283,7 +365,7 @@
     appState_ = APP_READ_REQUEST;
 
     // Work the socket right away
-    workSocket();
+    // workSocket();
 
     return;
 
@@ -315,13 +397,13 @@
    *
    * Prepares the event structure &event to be used in future calls to
    * event_add() and event_del().  The event will be prepared to call the
-   * event_handler using the 'sock' file descriptor to monitor events.
+   * eventHandler using the 'sock' file descriptor to monitor events.
    *
    * The events can be either EV_READ, EV_WRITE, or both, indicating
    * that an application can read or write from the file respectively without
    * blocking.
    *
-   * The event_handler will be called with the file descriptor that triggered
+   * The eventHandler will be called with the file descriptor that triggered
    * the event and the type of event which will be one of: EV_TIMEOUT,
    * EV_SIGNAL, EV_READ, EV_WRITE.
    *
@@ -330,7 +412,7 @@
    *
    * Once initialized, the &event struct can be used repeatedly with
    * event_add() and event_del() and does not need to be reinitialized unless
-   * the event_handler and/or the argument to it are to be changed.  However,
+   * the eventHandler and/or the argument to it are to be changed.  However,
    * when an ev structure has been added to libevent using event_add() the
    * structure must persist until the event occurs (assuming EV_PERSIST
    * is not set) or is removed using event_del().  You may not reuse the same
@@ -516,12 +598,12 @@
             this);
 
   // Add the event and start up the server
-  if (event_add(&serverEvent, 0) == -1) {
+  if (-1 == event_add(&serverEvent, 0)) {
     GlobalOutput("TNonblockingServer::serve(): coult not event_add");
     return;
   }
 
-  // Run libevent engine, never returns, invokes calls to event_handler
+  // Run libevent engine, never returns, invokes calls to eventHandler
   event_loop(0);
 }
 
diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h
index e4f8346..0ce7ccb 100644
--- a/lib/cpp/src/server/TNonblockingServer.h
+++ b/lib/cpp/src/server/TNonblockingServer.h
@@ -10,6 +10,7 @@
 #include <Thrift.h>
 #include <server/TServer.h>
 #include <transport/TTransportUtils.h>
+#include <concurrency/ThreadManager.h>
 #include <stack>
 #include <event.h>
 
@@ -17,6 +18,8 @@
 
 using facebook::thrift::transport::TMemoryBuffer;
 using facebook::thrift::protocol::TProtocol;
+using facebook::thrift::concurrency::Runnable;
+using facebook::thrift::concurrency::ThreadManager;
 
 // Forward declaration of class
 class TConnection;
@@ -46,6 +49,12 @@
   // Whether to frame responses
   bool frameResponses_;
 
+  // For processing via thread pool, may be NULL
+  boost::shared_ptr<ThreadManager> threadManager_;
+
+  // Is thread pool processing?
+  bool threadPoolProcessing_;
+
   /**
    * This is a stack of all the objects that have been created but that
    * are NOT currently in use. When we close a connection, we place it on this
@@ -66,15 +75,18 @@
 
   TNonblockingServer(boost::shared_ptr<TProcessor> processor, 
                      boost::shared_ptr<TProtocolFactory> protocolFactory,
-                     int port) :
+                     int port,
+                     boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
     TServer(processor),
     serverSocket_(0),
     port_(port),
-    frameResponses_(true) {
+    frameResponses_(true),
+    threadManager_(threadManager) {
     setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
     setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
     setInputProtocolFactory(protocolFactory);
     setOutputProtocolFactory(protocolFactory);
+    setThreadManager(threadManager);
   }
 
   TNonblockingServer(boost::shared_ptr<TProcessor> processor,
@@ -82,19 +94,35 @@
                      boost::shared_ptr<TTransportFactory> outputTransportFactory,
                      boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
                      boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
-                     int port) :
+                     int port,
+                     boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
     TServer(processor),
     serverSocket_(0),
     port_(port),
-    frameResponses_(true) {
+    frameResponses_(true),
+    threadManager_(threadManager) {
     setInputTransportFactory(inputTransportFactory);
     setOutputTransportFactory(outputTransportFactory);
     setInputProtocolFactory(inputProtocolFactory);
     setOutputProtocolFactory(outputProtocolFactory);
+    setThreadManager(threadManager);
   }
         
   ~TNonblockingServer() {}
 
+  void setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
+    threadManager_ = threadManager;
+    threadPoolProcessing_ = (threadManager != NULL);
+  }
+
+  bool isThreadPoolProcessing() {
+    return threadPoolProcessing_;
+  }
+
+  void addTask(boost::shared_ptr<Runnable> task) {
+    threadManager_->add(task);
+  }
+
   void setFrameResponses(bool frameResponses) {
     frameResponses_ = frameResponses;
   }
@@ -133,6 +161,7 @@
   APP_INIT,
   APP_READ_FRAME_SIZE,
   APP_READ_REQUEST,
+  APP_WAIT_TASK,
   APP_SEND_FRAME_SIZE,
   APP_SEND_RESULT
 };
@@ -144,6 +173,8 @@
 class TConnection {
  private:
 
+  class Task;
+
   // Server handle
   TNonblockingServer* server_;
 
@@ -186,6 +217,12 @@
   // Frame size
   int32_t frameSize_;
 
+  // Task handle
+  int taskHandle_;
+
+  // Task event
+  struct event taskEvent_;
+
   // Transport to read from
   boost::shared_ptr<TMemoryBuffer> inputTransport_;
 
@@ -248,9 +285,19 @@
 
   // Handler wrapper
   static void eventHandler(int fd, short which, void* v) {
-    assert(fd = ((TConnection*)v)->socket_);
+    assert(fd == ((TConnection*)v)->socket_);
     ((TConnection*)v)->workSocket();
   }
+  
+  // Handler wrapper for task block
+  static void taskHandler(int fd, short which, void* v) {
+    assert(fd == ((TConnection*)v)->taskHandle_);
+    if (-1 == ::close(((TConnection*)v)->taskHandle_)) {
+      GlobalOutput("TConnection::taskHandler close handle failed, resource leak");
+    }
+    ((TConnection*)v)->transition();
+  }
+
 };
 
 }}} // facebook::thrift::server
diff --git a/lib/cpp/src/transport/TServerSocket.cpp b/lib/cpp/src/transport/TServerSocket.cpp
index 0d25bf0..2f6ce5d 100644
--- a/lib/cpp/src/transport/TServerSocket.cpp
+++ b/lib/cpp/src/transport/TServerSocket.cpp
@@ -68,8 +68,8 @@
     intSock1_ = -1;
     intSock2_ = -1;
   } else {
-    intSock1_ = sv[0];
-    intSock2_ = sv[1];
+    intSock1_ = sv[1];
+    intSock2_ = sv[0];
   }
 
   serverSocket_ = socket(AF_INET, SOCK_STREAM, 0);