THRIFT-1442 TNonblockingServer: Refactor to allow multiple IO Threads
Patch: Pavlin Radoslavov
diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.cpp b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
index 398eade..dcc90e2 100644
--- a/lib/cpp/src/thrift/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
@@ -1191,13 +1191,12 @@
   }
 }
 
-/**
- * Main workhorse function, starts up the server listening on a port and
- * loops over the libevent handler.
- */
-void TNonblockingServer::serve() {
+void TNonblockingServer::registerEvents(event_base* user_event_base) {
+  userEventBase_ = user_event_base;
+
   // init listen socket
-  createAndListenOnSocket();
+  if (serverSocket_ < 0)
+    createAndListenOnSocket();
 
   // set up the IO threads
   assert(ioThreads_.empty());
@@ -1248,6 +1247,18 @@
     }
   }
 
+  // Register the events for the primary (listener) IO thread
+  ioThreads_[0]->registerEvents();
+}
+
+/**
+ * Main workhorse function, starts up the server listening on a port and
+ * loops over the libevent handler.
+ */
+void TNonblockingServer::serve() {
+
+  registerEvents(NULL);
+
   // Run the primary (listener) IO thread loop in our main thread; this will
   // only return when the server is shutting down.
   ioThreads_[0]->run();
@@ -1267,7 +1278,8 @@
       , number_(number)
       , listenSocket_(listenSocket)
       , useHighPriority_(useHighPriority)
-      , eventBase_(NULL) {
+      , eventBase_(NULL)
+      , ownEventBase_(false) {
   notificationPipeFDs_[0] = -1;
   notificationPipeFDs_[1] = -1;
 }
@@ -1276,8 +1288,9 @@
   // make sure our associated thread is fully finished
   join();
 
-  if (eventBase_) {
+  if (eventBase_ && ownEventBase_) {
     event_base_free(eventBase_);
+    ownEventBase_ = false;
   }
 
   if (listenSocket_ >= 0) {
@@ -1330,6 +1343,22 @@
  * Register the core libevent events onto the proper base.
  */
 void TNonblockingIOThread::registerEvents() {
+  threadId_ = Thread::get_current();
+
+  assert(eventBase_ == 0);
+  eventBase_ = getServer()->getUserEventBase();
+  if (eventBase_ == NULL) {
+    eventBase_ = event_base_new();
+    ownEventBase_ = true;
+  }
+
+  // Print some libevent stats
+  if (number_ == 0) {
+    GlobalOutput.printf("TNonblockingServer: using libevent %s method %s",
+            event_get_version(),
+            event_base_get_method(eventBase_));
+  }
+
   if (listenSocket_ >= 0) {
     // Register the server event
     event_set(&serverEvent_,
@@ -1478,20 +1507,8 @@
 }
 
 void TNonblockingIOThread::run() {
-  threadId_ = Thread::get_current();
-
-  assert(eventBase_ == 0);
-  eventBase_ = event_base_new();
-
-  // Print some libevent stats
-  if (number_ == 0) {
-    GlobalOutput.printf("TNonblockingServer: using libevent %s method %s",
-            event_get_version(),
-            event_base_get_method(eventBase_));
-  }
-
-
-  registerEvents();
+  if (eventBase_ == NULL)
+    registerEvents();
 
   GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...",
                       number_);
diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.h b/lib/cpp/src/thrift/server/TNonblockingServer.h
index 9e6ba17..585aa79 100644
--- a/lib/cpp/src/thrift/server/TNonblockingServer.h
+++ b/lib/cpp/src/thrift/server/TNonblockingServer.h
@@ -160,6 +160,9 @@
   /// Port server runs on
   int port_;
 
+  /// The optional user-provided event-base (for single-thread servers)
+  event_base* userEventBase_;
+
   /// For processing via thread pool, may be NULL
   boost::shared_ptr<ThreadManager> threadManager_;
 
@@ -279,6 +282,7 @@
     nextIOThread_ = 0;
     useHighPriorityIOThreads_ = false;
     port_ = port;
+    userEventBase_ = NULL;
     threadPoolProcessing_ = false;
     numTConnections_ = 0;
     numActiveProcessors_ = 0;
@@ -756,15 +760,6 @@
    */
   void stop();
 
- private:
-  /**
-   * Callback function that the threadmanager calls when a task reaches
-   * its expiration time.  It is needed to clean up the expired connection.
-   *
-   * @param task the runnable associated with the expired task.
-   */
-  void expireClose(boost::shared_ptr<Runnable> task);
-
   /// Creates a socket to listen on and binds it to the local port.
   void createAndListenOnSocket();
 
@@ -775,6 +770,32 @@
    * @param fd descriptor of socket to be initialized/
    */
   void listenSocket(THRIFT_SOCKET fd);
+
+  /**
+   * Register the optional user-provided event-base (for single-thread servers)
+   *
+   * This method should be used when the server is running in a single-thread
+   * mode, and the event base is provided by the user (i.e., the caller).
+   *
+   * @param user_event_base the user-provided event-base. The user is
+   * responsible for freeing the event base memory.
+   */
+  void registerEvents(event_base* user_event_base);
+
+  /**
+   * Returns the optional user-provided event-base (for single-thread servers).
+   */
+  event_base* getUserEventBase() const { return userEventBase_; }
+
+ private:
+  /**
+   * Callback function that the threadmanager calls when a task reaches
+   * its expiration time.  It is needed to clean up the expired connection.
+   *
+   * @param task the runnable associated with the expired task.
+   */
+  void expireClose(boost::shared_ptr<Runnable> task);
+
   /**
    * Return an initialized connection object.  Creates or recovers from
    * pool a TConnection and initializes it with the provided socket FD
@@ -847,6 +868,9 @@
   // Ensures that the event-loop thread is fully finished and shut down.
   void join();
 
+  /// Registers the events for the notification & listen sockets
+  void registerEvents();
+
  private:
   /**
    * C-callable event handler for signaling task completion.  Provides a
@@ -873,9 +897,6 @@
   /// Exits the loop ASAP in case of shutdown or error.
   void breakLoop(bool error);
 
-  /// Registers the events for the notification & listen sockets
-  void registerEvents();
-
   /// Create the pipe used to notify I/O process of task completion.
   void createNotificationPipe();
 
@@ -904,6 +925,10 @@
   /// pointer to eventbase to be used for looping
   event_base* eventBase_;
 
+  /// Set to true if this class is responsible for freeing the event base
+  /// memory.
+  bool ownEventBase_;
+
   /// Used with eventBase_ for connection events (only in listener thread)
   struct event serverEvent_;