Thrift-1442: TNonblockingServer: Refactor to allow multiple IO Threads
Client: cpp
Patch: Dave Watson

Ads multiple IO threads to TNonblocking Server


git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1210737 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h
index da36045..84e384c 100644
--- a/lib/cpp/src/server/TNonblockingServer.h
+++ b/lib/cpp/src/server/TNonblockingServer.h
@@ -26,7 +26,11 @@
 #include <transport/TSocket.h>
 #include <concurrency/ThreadManager.h>
 #include <climits>
+#include <concurrency/Thread.h>
+#include <concurrency/PosixThreadFactory.h>
+#include <concurrency/Mutex.h>
 #include <stack>
+#include <vector>
 #include <string>
 #include <errno.h>
 #include <cstdlib>
@@ -35,6 +39,8 @@
 #endif
 #include <event.h>
 
+
+
 namespace apache { namespace thrift { namespace server {
 
 using apache::thrift::transport::TMemoryBuffer;
@@ -42,6 +48,11 @@
 using apache::thrift::protocol::TProtocol;
 using apache::thrift::concurrency::Runnable;
 using apache::thrift::concurrency::ThreadManager;
+using apache::thrift::concurrency::PosixThreadFactory;
+using apache::thrift::concurrency::ThreadFactory;
+using apache::thrift::concurrency::Thread;
+using apache::thrift::concurrency::Mutex;
+using apache::thrift::concurrency::Guard;
 
 #ifdef LIBEVENT_VERSION_NUMBER
 #define LIBEVENT_VERSION_MAJOR (LIBEVENT_VERSION_NUMBER >> 24)
@@ -78,9 +89,10 @@
 }
 
 /**
- * This is a non-blocking server in C++ for high performance that operates a
- * single IO thread. It assumes that all incoming requests are framed with a
- * 4 byte length indicator and writes out responses using the same framing.
+ * This is a non-blocking server in C++ for high performance that
+ * operates a set of IO threads (by default only one). It assumes that
+ * all incoming requests are framed with a 4 byte length indicator and
+ * writes out responses using the same framing.
  *
  * It does not use the TServerTransport framework, but rather has socket
  * operations hardcoded for use with select.
@@ -95,10 +107,14 @@
   T_OVERLOAD_DRAIN_TASK_QUEUE  ///< Drop some tasks from head of task queue */
 };
 
+class TNonblockingIOThread;
+
 class TNonblockingServer : public TServer {
  private:
   class TConnection;
 
+  friend class TNonblockingIOThread;
+ private:
   /// Listen backlog
   static const int LISTEN_BACKLOG = 1024;
 
@@ -123,6 +139,18 @@
   /// # of calls before resizing oversized buffers (0 = check only on close)
   static const int RESIZE_BUFFER_EVERY_N = 512;
 
+  /// # of IO threads to use by default
+  static const int DEFAULT_IO_THREADS = 1;
+
+  /// File descriptor of an invalid socket
+  static const int INVALID_SOCKET = -1;
+
+  /// # of IO threads this server will use
+  size_t numIOThreads_;
+
+  /// Whether to set high scheduling priority for IO threads
+  bool useHighPriorityIOThreads_;
+
   /// Server socket file descriptor
   int serverSocket_;
 
@@ -135,15 +163,17 @@
   /// Is thread pool processing?
   bool threadPoolProcessing_;
 
-  /// The event base for libevent
-  event_base* eventBase_;
-  bool ownEventBase_;
+  // Factory to create the IO threads
+  boost::shared_ptr<PosixThreadFactory> ioThreadFactory_;
 
-  /// Event struct, used with eventBase_ for connection events
-  struct event serverEvent_;
+  // Vector of IOThread objects that will handle our IO
+  std::vector<boost::shared_ptr<TNonblockingIOThread> > ioThreads_;
 
-  /// Event struct, used with eventBase_ for task completion notification
-  struct event notificationEvent_;
+  // Index of next IO Thread to be used (for round-robin)
+  int nextIOThread_;
+
+  // Synchronizes access to connection stack and similar data
+  Mutex connMutex_;
 
   /// Number of TConnection object we've created
   size_t numTConnections_;
@@ -211,9 +241,6 @@
   /// Count of connections dropped on overload since server started
   uint64_t nTotalConnectionsDropped_;
 
-  /// File descriptors for pipe used for task completion notification.
-  evutil_socket_t notificationPipeFDs_[2];
-
   /**
    * This is a stack of all the objects that have been created but that
    * are NOT currently in use. When we close a connection, we place it on this
@@ -234,10 +261,11 @@
 
   void init(int port) {
     serverSocket_ = -1;
+    numIOThreads_ = DEFAULT_IO_THREADS;
+    nextIOThread_ = 0;
+    useHighPriorityIOThreads_ = false;
     port_ = port;
     threadPoolProcessing_ = false;
-    eventBase_ = NULL;
-    ownEventBase_ = false;
     numTConnections_ = 0;
     numActiveProcessors_ = 0;
     connectionStackLimit_ = CONNECTION_STACK_LIMIT;
@@ -360,6 +388,31 @@
   }
 
   /**
+   * Sets the number of IO threads used by this server. Can only be used before
+   * the call to serve() and has no effect afterwards.  We always use a
+   * PosixThreadFactory for the IO worker threads, because they must joinable
+   * for clean shutdown.
+   */
+  void setNumIOThreads(size_t numThreads) {
+    numIOThreads_ = numThreads;
+  }
+
+  /** Return whether the IO threads will get high scheduling priority */
+  bool useHighPriorityIOThreads() const {
+    return useHighPriorityIOThreads_;
+  }
+
+  /** Set whether the IO threads will get high scheduling priority. */
+  void setUseHighPriorityIOThreads(bool val) {
+    useHighPriorityIOThreads_ = val;
+  }
+
+  /** Return the number of IO threads used by this server. */
+  size_t getNumIOThreads() const {
+    return numIOThreads_;
+  }
+
+  /**
    * Get the maximum number of unused TConnection we will hold in reserve.
    *
    * @return the current limit on TConnection pool size.
@@ -385,20 +438,6 @@
     threadManager_->add(task, 0LL, taskExpireTime_);
   }
 
-  event_base* getEventBase() const {
-    return eventBase_;
-  }
-
-  /// Increment our count of the number of connected sockets.
-  void incrementNumConnections() {
-    ++numTConnections_;
-  }
-
-  /// Decrement our count of the number of connected sockets.
-  void decrementNumConnections() {
-    --numTConnections_;
-  }
-
   /**
    * Return the count of sockets currently connected to.
    *
@@ -431,11 +470,13 @@
 
   /// Increment the count of connections currently processing.
   void incrementActiveProcessors() {
+    Guard g(connMutex_);
     ++numActiveProcessors_;
   }
 
   /// Decrement the count of connections currently processing.
   void decrementActiveProcessors() {
+    Guard g(connMutex_);
     if (numActiveProcessors_ > 0) {
       --numActiveProcessors_;
     }
@@ -615,7 +656,7 @@
     idleReadBufferLimit_ = limit;
   }
 
-  
+
 
   /**
    * Get the maximum size of write buffer allocated to idle TConnection objects.
@@ -659,21 +700,48 @@
     resizeBufferEveryN_ = count;
   }
 
+  /**
+   * Main workhorse function, starts up the server listening on a port and
+   * loops over the libevent handler.
+   */
+  void serve();
 
+  /**
+   * Causes the server to terminate gracefully (can be called from any thread).
+   */
+  void stop();
 
+ private:
+  /**
+   * Callback function that the threadmanager calls when a task reaches
+   * its expiration time.  It is needed to clean up the expired connection.
+   *
+   * @param task the runnable associated with the expired task.
+   */
+  void expireClose(boost::shared_ptr<Runnable> task);
+
+  /// Creates a socket to listen on and binds it to the local port.
+  void createAndListenOnSocket();
+
+  /**
+   * Takes a socket created by createAndListenOnSocket() and sets various
+   * options on it to prepare for use in the server.
+   *
+   * @param fd descriptor of socket to be initialized/
+   */
+  void listenSocket(int fd);
   /**
    * Return an initialized connection object.  Creates or recovers from
    * pool a TConnection and initializes it with the provided socket FD
    * and flags.
    *
    * @param socket FD of socket associated with this connection.
-   * @param flags initial lib_event flags for this connection.
    * @param addr the sockaddr of the client
    * @param addrLen the length of addr
    * @return pointer to initialized TConnection object.
    */
-  TConnection* createConnection(int socket, short flags,
-                                const sockaddr* addr, socklen_t addrLen);
+  TConnection* createConnection(int socket, const sockaddr* addr,
+                                            socklen_t addrLen);
 
   /**
    * Returns a connection to pool or deletion.  If the connection pool
@@ -683,14 +751,67 @@
    * @param connection the TConection being returned.
    */
   void returnConnection(TConnection* connection);
+};
 
+class TNonblockingIOThread : public Runnable {
+ public:
+  // Creates an IO thread and sets up the event base.  The listenSocket should
+  // be a valid FD on which listen() has already been called.  If the
+  // listenSocket is < 0, accepting will not be done.
+  TNonblockingIOThread(TNonblockingServer* server,
+                       int number,
+                       int listenSocket,
+                       bool useHighPriority);
+
+  ~TNonblockingIOThread();
+
+  // Returns the event-base for this thread.
+  event_base* getEventBase() const { return eventBase_; }
+
+  // Returns the server for this thread.
+  TNonblockingServer* getServer() const { return server_; }
+
+  // Returns the number of this IO thread.
+  int getThreadNumber() const { return number_; }
+
+  // Returns the thread id associated with this object.  This should
+  // only be called after the thread has been started.
+  pthread_t getThreadId() const { return threadId_; }
+
+  // Returns the send-fd for task complete notifications.
+  int getNotificationSendFD() const { return notificationPipeFDs_[1]; }
+
+  // Returns the read-fd for task complete notifications.
+  int getNotificationRecvFD() const { return notificationPipeFDs_[0]; }
+
+  // Returns the actual thread object associated with this IO thread.
+  boost::shared_ptr<Thread> getThread() const { return thread_; }
+
+  // Sets the actual thread object associated with this IO thread.
+  void setThread(const boost::shared_ptr<Thread>& t) { thread_ = t; }
+
+  // Used by TConnection objects to indicate processing has finished.
+  bool notify(TNonblockingServer::TConnection* conn);
+
+  // Enters the event loop and does not return until a call to stop().
+  virtual void run();
+
+  // Exits the event loop as soon as possible.
+  void stop();
+
+  // Ensures that the event-loop thread is fully finished and shut down.
+  void join();
+
+ private:
   /**
-   * Callback function that the threadmanager calls when a task reaches
-   * its expiration time.  It is needed to clean up the expired connection.
+   * C-callable event handler for signaling task completion.  Provides a
+   * callback that libevent can understand that will read a connection
+   * object's address from a pipe and call connection->transition() for
+   * that object.
    *
-   * @param task the runnable associated with the expired task.
+   * @param fd the descriptor the event occurred on.
    */
-  void expireClose(boost::shared_ptr<Runnable> task);
+  static void notifyHandler(int fd, short which, void* v);
 
   /**
    * C-callable event handler for listener events.  Provides a callback
@@ -700,63 +821,57 @@
    * @param which the flags associated with the event.
    * @param v void* callback arg where we placed TNonblockingServer's "this".
    */
-  static void eventHandler(evutil_socket_t fd, short which, void* v) {
+  static void listenHandler(evutil_socket_t fd, short which, void* v) {
     ((TNonblockingServer*)v)->handleEvent(fd, which);
   }
 
-  /// Creates a socket to listen on and binds it to the local port.
-  void listenSocket();
+  /// Exits the loop ASAP in case of shutdown or error.
+  void breakLoop(bool error);
 
-  /**
-   * Takes a socket created by listenSocket() and sets various options on it
-   * to prepare for use in the server.
-   *
-   * @param fd descriptor of socket to be initialized/
-   */
-  void listenSocket(int fd);
+  /// Registers the events for the notification & listen sockets
+  void registerEvents();
 
   /// Create the pipe used to notify I/O process of task completion.
   void createNotificationPipe();
 
-  /**
-   * Get notification pipe send descriptor.
-   *
-   * @return write fd for pipe.
-   */
-  evutil_socket_t getNotificationSendFD() const {
-    return notificationPipeFDs_[1];
-  }
+  /// Unregisters our events for notification and listen sockets.
+  void cleanupEvents();
 
-  /**
-   * Get notification pipe receive descriptor.
-   *
-   * @return read fd of pipe.
-   */
-  evutil_socket_t getNotificationRecvFD() const {
-    return notificationPipeFDs_[0];
-  }
+  /// Sets (or clears) high priority scheduling status for the current thread.
+  void setCurrentThreadHighPriority(bool value);
 
-  /**
-   * Register the core libevent events onto the proper base.
-   *
-   * @param base pointer to the event base to be initialized.
-   * @param ownEventBase if true, this server is responsible for
-   * freeing the event base memory.
-   */
-  void registerEvents(event_base* base, bool ownEventBase = true);
+ private:
+  /// associated server
+  TNonblockingServer* server_;
 
-  /**
-   * Main workhorse function, starts up the server listening on a port and
-   * loops over the libevent handler.
-   */
-  void serve();
+  /// thread number (for debugging).
+  const int number_;
 
-  /**
-   * May be called from a separate thread to cause serve() to return.
-   */
-  void stop();
+  /// The actual physical thread id.
+  pthread_t threadId_;
+
+  /// If listenSocket_ >= 0, adds an event on the event_base to accept conns
+  int listenSocket_;
+
+  /// Sets a high scheduling priority when running
+  bool useHighPriority_;
+
+  /// pointer to eventbase to be used for looping
+  event_base* eventBase_;
+
+  /// Used with eventBase_ for connection events (only in listener thread)
+  struct event serverEvent_;
+
+  /// Used with eventBase_ for task completion notification
+  struct event notificationEvent_;
+
+ /// File descriptors for pipe used for task completion notification.
+  int notificationPipeFDs_[2];
+
+  /// Actual IO Thread
+  boost::shared_ptr<Thread> thread_;
 };
 
 }}} // apache::thrift::server
 
-#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_
+#endif // #ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_