Thrift-1442: TNonblockingServer: Refactor to allow multiple IO Threads
Client: cpp
Patch: Dave Watson
Ads multiple IO threads to TNonblocking Server
git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1210737 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h
index da36045..84e384c 100644
--- a/lib/cpp/src/server/TNonblockingServer.h
+++ b/lib/cpp/src/server/TNonblockingServer.h
@@ -26,7 +26,11 @@
#include <transport/TSocket.h>
#include <concurrency/ThreadManager.h>
#include <climits>
+#include <concurrency/Thread.h>
+#include <concurrency/PosixThreadFactory.h>
+#include <concurrency/Mutex.h>
#include <stack>
+#include <vector>
#include <string>
#include <errno.h>
#include <cstdlib>
@@ -35,6 +39,8 @@
#endif
#include <event.h>
+
+
namespace apache { namespace thrift { namespace server {
using apache::thrift::transport::TMemoryBuffer;
@@ -42,6 +48,11 @@
using apache::thrift::protocol::TProtocol;
using apache::thrift::concurrency::Runnable;
using apache::thrift::concurrency::ThreadManager;
+using apache::thrift::concurrency::PosixThreadFactory;
+using apache::thrift::concurrency::ThreadFactory;
+using apache::thrift::concurrency::Thread;
+using apache::thrift::concurrency::Mutex;
+using apache::thrift::concurrency::Guard;
#ifdef LIBEVENT_VERSION_NUMBER
#define LIBEVENT_VERSION_MAJOR (LIBEVENT_VERSION_NUMBER >> 24)
@@ -78,9 +89,10 @@
}
/**
- * This is a non-blocking server in C++ for high performance that operates a
- * single IO thread. It assumes that all incoming requests are framed with a
- * 4 byte length indicator and writes out responses using the same framing.
+ * This is a non-blocking server in C++ for high performance that
+ * operates a set of IO threads (by default only one). It assumes that
+ * all incoming requests are framed with a 4 byte length indicator and
+ * writes out responses using the same framing.
*
* It does not use the TServerTransport framework, but rather has socket
* operations hardcoded for use with select.
@@ -95,10 +107,14 @@
T_OVERLOAD_DRAIN_TASK_QUEUE ///< Drop some tasks from head of task queue */
};
+class TNonblockingIOThread;
+
class TNonblockingServer : public TServer {
private:
class TConnection;
+ friend class TNonblockingIOThread;
+ private:
/// Listen backlog
static const int LISTEN_BACKLOG = 1024;
@@ -123,6 +139,18 @@
/// # of calls before resizing oversized buffers (0 = check only on close)
static const int RESIZE_BUFFER_EVERY_N = 512;
+ /// # of IO threads to use by default
+ static const int DEFAULT_IO_THREADS = 1;
+
+ /// File descriptor of an invalid socket
+ static const int INVALID_SOCKET = -1;
+
+ /// # of IO threads this server will use
+ size_t numIOThreads_;
+
+ /// Whether to set high scheduling priority for IO threads
+ bool useHighPriorityIOThreads_;
+
/// Server socket file descriptor
int serverSocket_;
@@ -135,15 +163,17 @@
/// Is thread pool processing?
bool threadPoolProcessing_;
- /// The event base for libevent
- event_base* eventBase_;
- bool ownEventBase_;
+ // Factory to create the IO threads
+ boost::shared_ptr<PosixThreadFactory> ioThreadFactory_;
- /// Event struct, used with eventBase_ for connection events
- struct event serverEvent_;
+ // Vector of IOThread objects that will handle our IO
+ std::vector<boost::shared_ptr<TNonblockingIOThread> > ioThreads_;
- /// Event struct, used with eventBase_ for task completion notification
- struct event notificationEvent_;
+ // Index of next IO Thread to be used (for round-robin)
+ int nextIOThread_;
+
+ // Synchronizes access to connection stack and similar data
+ Mutex connMutex_;
/// Number of TConnection object we've created
size_t numTConnections_;
@@ -211,9 +241,6 @@
/// Count of connections dropped on overload since server started
uint64_t nTotalConnectionsDropped_;
- /// File descriptors for pipe used for task completion notification.
- evutil_socket_t notificationPipeFDs_[2];
-
/**
* This is a stack of all the objects that have been created but that
* are NOT currently in use. When we close a connection, we place it on this
@@ -234,10 +261,11 @@
void init(int port) {
serverSocket_ = -1;
+ numIOThreads_ = DEFAULT_IO_THREADS;
+ nextIOThread_ = 0;
+ useHighPriorityIOThreads_ = false;
port_ = port;
threadPoolProcessing_ = false;
- eventBase_ = NULL;
- ownEventBase_ = false;
numTConnections_ = 0;
numActiveProcessors_ = 0;
connectionStackLimit_ = CONNECTION_STACK_LIMIT;
@@ -360,6 +388,31 @@
}
/**
+ * Sets the number of IO threads used by this server. Can only be used before
+ * the call to serve() and has no effect afterwards. We always use a
+ * PosixThreadFactory for the IO worker threads, because they must joinable
+ * for clean shutdown.
+ */
+ void setNumIOThreads(size_t numThreads) {
+ numIOThreads_ = numThreads;
+ }
+
+ /** Return whether the IO threads will get high scheduling priority */
+ bool useHighPriorityIOThreads() const {
+ return useHighPriorityIOThreads_;
+ }
+
+ /** Set whether the IO threads will get high scheduling priority. */
+ void setUseHighPriorityIOThreads(bool val) {
+ useHighPriorityIOThreads_ = val;
+ }
+
+ /** Return the number of IO threads used by this server. */
+ size_t getNumIOThreads() const {
+ return numIOThreads_;
+ }
+
+ /**
* Get the maximum number of unused TConnection we will hold in reserve.
*
* @return the current limit on TConnection pool size.
@@ -385,20 +438,6 @@
threadManager_->add(task, 0LL, taskExpireTime_);
}
- event_base* getEventBase() const {
- return eventBase_;
- }
-
- /// Increment our count of the number of connected sockets.
- void incrementNumConnections() {
- ++numTConnections_;
- }
-
- /// Decrement our count of the number of connected sockets.
- void decrementNumConnections() {
- --numTConnections_;
- }
-
/**
* Return the count of sockets currently connected to.
*
@@ -431,11 +470,13 @@
/// Increment the count of connections currently processing.
void incrementActiveProcessors() {
+ Guard g(connMutex_);
++numActiveProcessors_;
}
/// Decrement the count of connections currently processing.
void decrementActiveProcessors() {
+ Guard g(connMutex_);
if (numActiveProcessors_ > 0) {
--numActiveProcessors_;
}
@@ -615,7 +656,7 @@
idleReadBufferLimit_ = limit;
}
-
+
/**
* Get the maximum size of write buffer allocated to idle TConnection objects.
@@ -659,21 +700,48 @@
resizeBufferEveryN_ = count;
}
+ /**
+ * Main workhorse function, starts up the server listening on a port and
+ * loops over the libevent handler.
+ */
+ void serve();
+ /**
+ * Causes the server to terminate gracefully (can be called from any thread).
+ */
+ void stop();
+ private:
+ /**
+ * Callback function that the threadmanager calls when a task reaches
+ * its expiration time. It is needed to clean up the expired connection.
+ *
+ * @param task the runnable associated with the expired task.
+ */
+ void expireClose(boost::shared_ptr<Runnable> task);
+
+ /// Creates a socket to listen on and binds it to the local port.
+ void createAndListenOnSocket();
+
+ /**
+ * Takes a socket created by createAndListenOnSocket() and sets various
+ * options on it to prepare for use in the server.
+ *
+ * @param fd descriptor of socket to be initialized/
+ */
+ void listenSocket(int fd);
/**
* Return an initialized connection object. Creates or recovers from
* pool a TConnection and initializes it with the provided socket FD
* and flags.
*
* @param socket FD of socket associated with this connection.
- * @param flags initial lib_event flags for this connection.
* @param addr the sockaddr of the client
* @param addrLen the length of addr
* @return pointer to initialized TConnection object.
*/
- TConnection* createConnection(int socket, short flags,
- const sockaddr* addr, socklen_t addrLen);
+ TConnection* createConnection(int socket, const sockaddr* addr,
+ socklen_t addrLen);
/**
* Returns a connection to pool or deletion. If the connection pool
@@ -683,14 +751,67 @@
* @param connection the TConection being returned.
*/
void returnConnection(TConnection* connection);
+};
+class TNonblockingIOThread : public Runnable {
+ public:
+ // Creates an IO thread and sets up the event base. The listenSocket should
+ // be a valid FD on which listen() has already been called. If the
+ // listenSocket is < 0, accepting will not be done.
+ TNonblockingIOThread(TNonblockingServer* server,
+ int number,
+ int listenSocket,
+ bool useHighPriority);
+
+ ~TNonblockingIOThread();
+
+ // Returns the event-base for this thread.
+ event_base* getEventBase() const { return eventBase_; }
+
+ // Returns the server for this thread.
+ TNonblockingServer* getServer() const { return server_; }
+
+ // Returns the number of this IO thread.
+ int getThreadNumber() const { return number_; }
+
+ // Returns the thread id associated with this object. This should
+ // only be called after the thread has been started.
+ pthread_t getThreadId() const { return threadId_; }
+
+ // Returns the send-fd for task complete notifications.
+ int getNotificationSendFD() const { return notificationPipeFDs_[1]; }
+
+ // Returns the read-fd for task complete notifications.
+ int getNotificationRecvFD() const { return notificationPipeFDs_[0]; }
+
+ // Returns the actual thread object associated with this IO thread.
+ boost::shared_ptr<Thread> getThread() const { return thread_; }
+
+ // Sets the actual thread object associated with this IO thread.
+ void setThread(const boost::shared_ptr<Thread>& t) { thread_ = t; }
+
+ // Used by TConnection objects to indicate processing has finished.
+ bool notify(TNonblockingServer::TConnection* conn);
+
+ // Enters the event loop and does not return until a call to stop().
+ virtual void run();
+
+ // Exits the event loop as soon as possible.
+ void stop();
+
+ // Ensures that the event-loop thread is fully finished and shut down.
+ void join();
+
+ private:
/**
- * Callback function that the threadmanager calls when a task reaches
- * its expiration time. It is needed to clean up the expired connection.
+ * C-callable event handler for signaling task completion. Provides a
+ * callback that libevent can understand that will read a connection
+ * object's address from a pipe and call connection->transition() for
+ * that object.
*
- * @param task the runnable associated with the expired task.
+ * @param fd the descriptor the event occurred on.
*/
- void expireClose(boost::shared_ptr<Runnable> task);
+ static void notifyHandler(int fd, short which, void* v);
/**
* C-callable event handler for listener events. Provides a callback
@@ -700,63 +821,57 @@
* @param which the flags associated with the event.
* @param v void* callback arg where we placed TNonblockingServer's "this".
*/
- static void eventHandler(evutil_socket_t fd, short which, void* v) {
+ static void listenHandler(evutil_socket_t fd, short which, void* v) {
((TNonblockingServer*)v)->handleEvent(fd, which);
}
- /// Creates a socket to listen on and binds it to the local port.
- void listenSocket();
+ /// Exits the loop ASAP in case of shutdown or error.
+ void breakLoop(bool error);
- /**
- * Takes a socket created by listenSocket() and sets various options on it
- * to prepare for use in the server.
- *
- * @param fd descriptor of socket to be initialized/
- */
- void listenSocket(int fd);
+ /// Registers the events for the notification & listen sockets
+ void registerEvents();
/// Create the pipe used to notify I/O process of task completion.
void createNotificationPipe();
- /**
- * Get notification pipe send descriptor.
- *
- * @return write fd for pipe.
- */
- evutil_socket_t getNotificationSendFD() const {
- return notificationPipeFDs_[1];
- }
+ /// Unregisters our events for notification and listen sockets.
+ void cleanupEvents();
- /**
- * Get notification pipe receive descriptor.
- *
- * @return read fd of pipe.
- */
- evutil_socket_t getNotificationRecvFD() const {
- return notificationPipeFDs_[0];
- }
+ /// Sets (or clears) high priority scheduling status for the current thread.
+ void setCurrentThreadHighPriority(bool value);
- /**
- * Register the core libevent events onto the proper base.
- *
- * @param base pointer to the event base to be initialized.
- * @param ownEventBase if true, this server is responsible for
- * freeing the event base memory.
- */
- void registerEvents(event_base* base, bool ownEventBase = true);
+ private:
+ /// associated server
+ TNonblockingServer* server_;
- /**
- * Main workhorse function, starts up the server listening on a port and
- * loops over the libevent handler.
- */
- void serve();
+ /// thread number (for debugging).
+ const int number_;
- /**
- * May be called from a separate thread to cause serve() to return.
- */
- void stop();
+ /// The actual physical thread id.
+ pthread_t threadId_;
+
+ /// If listenSocket_ >= 0, adds an event on the event_base to accept conns
+ int listenSocket_;
+
+ /// Sets a high scheduling priority when running
+ bool useHighPriority_;
+
+ /// pointer to eventbase to be used for looping
+ event_base* eventBase_;
+
+ /// Used with eventBase_ for connection events (only in listener thread)
+ struct event serverEvent_;
+
+ /// Used with eventBase_ for task completion notification
+ struct event notificationEvent_;
+
+ /// File descriptors for pipe used for task completion notification.
+ int notificationPipeFDs_[2];
+
+ /// Actual IO Thread
+ boost::shared_ptr<Thread> thread_;
};
}}} // apache::thrift::server
-#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_
+#endif // #ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_