Thrift-1442: TNonblockingServer: Refactor to allow multiple IO Threads
Client: cpp
Patch: Dave Watson
Ads multiple IO threads to TNonblocking Server
git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1210737 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cpp b/lib/cpp/src/concurrency/PosixThreadFactory.cpp
index 70204f1..6924aa6 100644
--- a/lib/cpp/src/concurrency/PosixThreadFactory.cpp
+++ b/lib/cpp/src/concurrency/PosixThreadFactory.cpp
@@ -156,7 +156,13 @@
cause the process to run out of thread resources.
We're beyond the point of throwing an exception. Not clear how
best to handle this. */
- detached_ = pthread_join(pthread_, &ignore) == 0;
+ int res = pthread_join(pthread_, &ignore);
+ detached_ = (res == 0);
+ if (res != 0) {
+ GlobalOutput.printf("PthreadThread::join(): fail with code %d", res);
+ }
+ } else {
+ GlobalOutput.printf("PthreadThread::join(): detached thread");
}
}
diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp
index c331eda..ba029a9 100644
--- a/lib/cpp/src/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/server/TNonblockingServer.cpp
@@ -24,6 +24,7 @@
#include "TNonblockingServer.h"
#include <concurrency/Exception.h>
#include <transport/TSocket.h>
+#include <concurrency/PosixThreadFactory.h>
#include <iostream>
@@ -50,6 +51,7 @@
#include <errno.h>
#include <assert.h>
+#include <sched.h>
#ifndef AF_LOCAL
#define AF_LOCAL AF_UNIX
@@ -63,6 +65,7 @@
using namespace std;
using apache::thrift::transport::TSocket;
using apache::thrift::transport::TTransportException;
+using boost::shared_ptr;
/// Three states for sockets: recv frame size, recv data, and send mode
enum TSocketState {
@@ -94,6 +97,8 @@
*/
class TNonblockingServer::TConnection {
private:
+ /// Server IO Thread handling this connection
+ TNonblockingIOThread* ioThread_;
/// Server handle
TNonblockingServer* server_;
@@ -209,25 +214,25 @@
class Task;
/// Constructor
- TConnection(int socket, short eventFlags, TNonblockingServer *s,
+ TConnection(int socket, TNonblockingIOThread* ioThread,
const sockaddr* addr, socklen_t addrLen) {
readBuffer_ = NULL;
readBufferSize_ = 0;
- // Allocate input and output transports
- // these only need to be allocated once per TConnection (they don't need to be
- // reallocated on init() call)
- inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
- outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(s->getWriteBufferDefaultSize()));
- tSocket_.reset(new TSocket());
+ ioThread_ = ioThread;
+ server_ = ioThread->getServer();
- init(socket, eventFlags, s, addr, addrLen);
- server_->incrementNumConnections();
+ // Allocate input and output transports these only need to be allocated
+ // once per TConnection (they don't need to be reallocated on init() call)
+ inputTransport_.reset(new TMemoryBuffer(readBuffer_, readBufferSize_));
+ outputTransport_.reset(new TMemoryBuffer(
+ server_->getWriteBufferDefaultSize()));
+ tSocket_.reset(new TSocket());
+ init(socket, ioThread, addr, addrLen);
}
~TConnection() {
std::free(readBuffer_);
- server_->decrementNumConnections();
}
/**
@@ -239,7 +244,7 @@
void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
/// Initialize
- void init(int socket, short eventFlags, TNonblockingServer *s,
+ void init(int socket, TNonblockingIOThread* ioThread,
const sockaddr* addr, socklen_t addrLen);
/**
@@ -263,60 +268,41 @@
}
/**
- * C-callable event handler for signaling task completion. Provides a
- * callback that libevent can understand that will read a connection
- * object's address from a pipe and call connection->transition() for
- * that object.
- *
- * @param fd the descriptor the event occurred on.
- */
- static void taskHandler(evutil_socket_t fd, short /* which */, void* /* v */) {
- TConnection* connection;
- ssize_t nBytes;
- while ((nBytes = recv(fd, cast_sockopt(&connection), sizeof(TConnection*), 0))
- == sizeof(TConnection*)) {
- connection->transition();
- }
- if (nBytes > 0) {
- throw TException("TConnection::taskHandler unexpected partial read");
- }
- if (errno != EWOULDBLOCK && errno != EAGAIN) {
- GlobalOutput.perror("TConnection::taskHandler read failed, resource leak", errno);
- }
- }
-
- /**
* Notification to server that processing has ended on this request.
* Can be called either when processing is completed or when a waiting
* task has been preemptively terminated (on overload).
*
+ * Don't call this from the IO thread itself.
+ *
* @return true if successful, false if unable to notify (check errno).
*/
- bool notifyServer() {
- TConnection* connection = this;
- if (send(server_->getNotificationSendFD(), const_cast_sockopt(&connection),
- sizeof(TConnection*), 0) != sizeof(TConnection*)) {
- return false;
- }
+ bool notifyIOThread() {
+ return ioThread_->notify(this);
+ }
- return true;
+ /*
+ * Returns the number of this connection's currently assigned IO
+ * thread.
+ */
+ int getIOThreadNumber() const {
+ return ioThread_->getThreadNumber();
}
/// Force connection shutdown for this connection.
void forceClose() {
appState_ = APP_CLOSE_CONNECTION;
- if (!notifyServer()) {
+ if (!notifyIOThread()) {
throw TException("TConnection::forceClose: failed write on notify pipe");
}
}
/// return the server this connection was initialized for.
- TNonblockingServer* getServer() {
+ TNonblockingServer* getServer() const {
return server_;
}
/// get state of connection.
- TAppState getState() {
+ TAppState getState() const {
return appState_;
}
@@ -362,19 +348,20 @@
}
}
} catch (const TTransportException& ttx) {
- GlobalOutput.printf("TNonblockingServer client died: %s", ttx.what());
+ GlobalOutput.printf("TNonblockingServer: client died: %s", ttx.what());
} catch (const bad_alloc&) {
- GlobalOutput("TNonblockingServer caught bad_alloc exception.");
+ GlobalOutput("TNonblockingServer: caught bad_alloc exception.");
exit(-1);
} catch (const std::exception& x) {
- GlobalOutput.printf("TNonblockingServer process() exception: %s: %s",
+ GlobalOutput.printf("TNonblockingServer: process() exception: %s: %s",
typeid(x).name(), x.what());
} catch (...) {
- GlobalOutput("TNonblockingServer uncaught exception.");
+ GlobalOutput.printf(
+ "TNonblockingServer: unknown exception while processing.");
}
// Signal completion back to the libevent thread via a pipe
- if (!connection_->notifyServer()) {
+ if (!connection_->notifyIOThread()) {
throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
}
}
@@ -392,14 +379,15 @@
void* connectionContext_;
};
-void TNonblockingServer::TConnection::init(int socket, short eventFlags,
- TNonblockingServer* s,
+void TNonblockingServer::TConnection::init(int socket,
+ TNonblockingIOThread* ioThread,
const sockaddr* addr,
socklen_t addrLen) {
tSocket_->setSocketFD(socket);
tSocket_->setCachedAddress(addr, addrLen);
- server_ = s;
+ ioThread_ = ioThread;
+ server_ = ioThread->getServer();
appState_ = APP_INIT;
eventFlags_ = 0;
@@ -412,30 +400,31 @@
largestWriteBufferSize_ = 0;
socketState_ = SOCKET_RECV_FRAMING;
- appState_ = APP_INIT;
callsForResize_ = 0;
- // Set flags, which also registers the event
- setFlags(eventFlags);
-
// get input/transports
- factoryInputTransport_ = s->getInputTransportFactory()->getTransport(inputTransport_);
- factoryOutputTransport_ = s->getOutputTransportFactory()->getTransport(outputTransport_);
+ factoryInputTransport_ = server_->getInputTransportFactory()->getTransport(
+ inputTransport_);
+ factoryOutputTransport_ = server_->getOutputTransportFactory()->getTransport(
+ outputTransport_);
// Create protocol
- inputProtocol_ = s->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
- outputProtocol_ = s->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
+ inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(
+ factoryInputTransport_);
+ outputProtocol_ = server_->getOutputProtocolFactory()->getProtocol(
+ factoryOutputTransport_);
// Set up for any server event handler
serverEventHandler_ = server_->getEventHandler();
if (serverEventHandler_ != NULL) {
- connectionContext_ = serverEventHandler_->createContext(inputProtocol_, outputProtocol_);
+ connectionContext_ = serverEventHandler_->createContext(inputProtocol_,
+ outputProtocol_);
} else {
connectionContext_ = NULL;
}
// Get the processor
- processor_ = s->getProcessor(inputProtocol_, outputProtocol_, tSocket_);
+ processor_ = server_->getProcessor(inputProtocol_, outputProtocol_, tSocket_);
}
void TNonblockingServer::TConnection::workSocket() {
@@ -500,7 +489,7 @@
return;
}
-
+
if (got > 0) {
// Move along in the buffer
readBufferPos_ += got;
@@ -565,6 +554,9 @@
* to, or finished receiving the data that it needed to.
*/
void TNonblockingServer::TConnection::transition() {
+ // ensure this connection is active right now
+ assert(ioThread_);
+ assert(server_);
// Switch upon the state that we are currently in and move to a new state
switch (appState_) {
@@ -800,7 +792,7 @@
*/
event_set(&event_, tSocket_->getSocketFD(), eventFlags_,
TConnection::eventHandler, this);
- event_base_set(server_->getEventBase(), &event_);
+ event_base_set(ioThread_->getEventBase(), &event_);
// Add the event
if (event_add(&event_, 0) == -1) {
@@ -820,6 +812,7 @@
if (serverEventHandler_ != NULL) {
serverEventHandler_->deleteContext(connectionContext_, inputProtocol_, outputProtocol_);
}
+ ioThread_ = NULL;
// Close the socket
tSocket_->close();
@@ -861,14 +854,6 @@
connectionStack_.pop();
delete connection;
}
-
- if (eventBase_ && ownEventBase_) {
- event_base_free(eventBase_);
- }
-
- if (serverSocket_ >= 0) {
- close(serverSocket_);
- }
}
/**
@@ -876,27 +861,41 @@
* by allocating a new one entirely
*/
TNonblockingServer::TConnection* TNonblockingServer::createConnection(
- int socket, short flags,
- const sockaddr* addr,
- socklen_t addrLen) {
+ int socket, const sockaddr* addr, socklen_t addrLen) {
// Check the stack
+ Guard g(connMutex_);
+
+ // pick an IO thread to handle this connection -- currently round robin
+ assert(nextIOThread_ >= 0);
+ assert(nextIOThread_ < ioThreads_.size());
+ int selectedThreadIdx = nextIOThread_;
+ nextIOThread_ = (nextIOThread_ + 1) % ioThreads_.size();
+
+ TNonblockingIOThread* ioThread = ioThreads_[selectedThreadIdx].get();
+
+ // Check the connection stack to see if we can re-use
+ TConnection* result = NULL;
if (connectionStack_.empty()) {
- return new TConnection(socket, flags, this, addr, addrLen);
+ result = new TConnection(socket, ioThread, addr, addrLen);
+ ++numTConnections_;
} else {
- TConnection* result = connectionStack_.top();
+ result = connectionStack_.top();
connectionStack_.pop();
- result->init(socket, flags, this, addr, addrLen);
- return result;
+ result->init(socket, ioThread, addr, addrLen);
}
+ return result;
}
/**
* Returns a connection to the stack
*/
void TNonblockingServer::returnConnection(TConnection* connection) {
+ Guard g(connMutex_);
+
if (connectionStackLimit_ &&
(connectionStack_.size() >= connectionStackLimit_)) {
delete connection;
+ --numTConnections_;
} else {
connection->checkIdleBufferMemLimit(idleReadBufferLimit_, idleWriteBufferLimit_);
connectionStack_.push(connection);
@@ -927,6 +926,7 @@
while ((clientSocket = ::accept(fd, addrp, &addrLen)) != -1) {
// If we're overloaded, take action here
if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
+ Guard g(connMutex_);
nConnectionsDropped_++;
nTotalConnectionsDropped_++;
if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
@@ -940,6 +940,7 @@
}
}
}
+
// Explicitly set this socket to NONBLOCK mode
int flags;
if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
@@ -951,7 +952,7 @@
// Create a new TConnection for this client socket.
TConnection* clientConnection =
- createConnection(clientSocket, EV_READ | EV_PERSIST, addrp, addrLen);
+ createConnection(clientSocket, addrp, addrLen);
// Fail fast if we could not create a TConnection object
if (clientConnection == NULL) {
@@ -960,13 +961,29 @@
return;
}
- // Put this client connection into the proper state
- clientConnection->transition();
+ /*
+ * Either notify the ioThread that is assigned this connection to
+ * start processing, or if it is us, we'll just ask this
+ * connection to do its initial state change here.
+ *
+ * (We need to avoid writing to our own notification pipe, to
+ * avoid possible deadlocks if the pipe is full.)
+ *
+ * The IO thread #0 is the only one that handles these listen
+ * events, so unless the connection has been assigned to thread #0
+ * we know it's not on our thread.
+ */
+ if (clientConnection->getIOThreadNumber() == 0) {
+ clientConnection->transition();
+ } else {
+ clientConnection->notifyIOThread();
+ }
// addrLen is written by the accept() call, so needs to be set before the next call.
addrLen = sizeof(addrStorage);
}
+
// Done looping accept, now we have to make sure the error is due to
// blocking. Any other error is a problem
if (errno != EAGAIN && errno != EWOULDBLOCK) {
@@ -977,8 +994,9 @@
/**
* Creates a socket to listen on and binds it to the local port.
*/
-void TNonblockingServer::listenSocket() {
+void TNonblockingServer::createAndListenOnSocket() {
int s;
+
struct addrinfo hints, *res, *res0;
int error;
@@ -1082,63 +1100,6 @@
serverSocket_ = s;
}
-void TNonblockingServer::createNotificationPipe() {
- if(evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) {
- GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR());
- throw TException("can't create notification pipe");
- }
- if(evutil_make_socket_nonblocking(notificationPipeFDs_[0])<0 ||
- evutil_make_socket_nonblocking(notificationPipeFDs_[1])<0) {
- close(notificationPipeFDs_[0]);
- close(notificationPipeFDs_[1]);
- throw TException("TNonblockingServer::createNotificationPipe() O_NONBLOCK");
- }
-}
-
-/**
- * Register the core libevent events onto the proper base.
- */
-void TNonblockingServer::registerEvents(event_base* base, bool ownEventBase) {
- assert(serverSocket_ != -1);
- assert(!eventBase_);
- eventBase_ = base;
- ownEventBase_ = ownEventBase;
-
- // Print some libevent stats
- GlobalOutput.printf("libevent %s method %s",
- event_get_version(),
- event_base_get_method(eventBase_));
-
- // Register the server event
- event_set(&serverEvent_,
- serverSocket_,
- EV_READ | EV_PERSIST,
- TNonblockingServer::eventHandler,
- this);
- event_base_set(eventBase_, &serverEvent_);
-
- // Add the event and start up the server
- if (-1 == event_add(&serverEvent_, 0)) {
- throw TException("TNonblockingServer::serve(): coult not event_add");
- }
- if (threadPoolProcessing_) {
- // Create an event to be notified when a task finishes
- event_set(¬ificationEvent_,
- getNotificationRecvFD(),
- EV_READ | EV_PERSIST,
- TConnection::taskHandler,
- this);
-
- // Attach to the base
- event_base_set(eventBase_, ¬ificationEvent_);
-
- // Add the event and start up the server
- if (-1 == event_add(¬ificationEvent_, 0)) {
- throw TException("TNonblockingServer::serve(): notification event_add fail");
- }
- }
-}
-
void TNonblockingServer::setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
threadManager_ = threadManager;
if (threadManager != NULL) {
@@ -1154,14 +1115,15 @@
if (numActiveProcessors_ > maxActiveProcessors_ ||
activeConnections > maxConnections_) {
if (!overloaded_) {
- GlobalOutput.printf("thrift non-blocking server overload condition");
+ GlobalOutput.printf("TNonblockingServer: overload condition begun.");
overloaded_ = true;
}
} else {
if (overloaded_ &&
(numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_) &&
(activeConnections <= overloadHysteresis_ * maxConnections_)) {
- GlobalOutput.printf("thrift non-blocking server overload ended; %u dropped (%llu total)",
+ GlobalOutput.printf("TNonblockingServer: overload ended; "
+ "%u dropped (%llu total)",
nConnectionsDropped_, nTotalConnectionsDropped_);
nConnectionsDropped_ = 0;
overloaded_ = false;
@@ -1189,73 +1151,361 @@
void TNonblockingServer::expireClose(boost::shared_ptr<Runnable> task) {
TConnection* connection =
static_cast<TConnection::Task*>(task.get())->getTConnection();
- assert(connection && connection->getServer()
- && connection->getState() == APP_WAIT_TASK);
+ assert(connection && connection->getServer() &&
+ connection->getState() == APP_WAIT_TASK);
connection->forceClose();
}
+void TNonblockingServer::stop() {
+ // Breaks the event loop in all threads so that they end ASAP.
+ for (int i = 0; i < ioThreads_.size(); ++i) {
+ ioThreads_[i]->stop();
+ }
+}
+
/**
* Main workhorse function, starts up the server listening on a port and
* loops over the libevent handler.
*/
void TNonblockingServer::serve() {
- // Init socket
- listenSocket();
+ // init listen socket
+ createAndListenOnSocket();
- if (threadPoolProcessing_) {
- // Init task completion notification pipe
- createNotificationPipe();
+ // set up the IO threads
+ assert(ioThreads_.empty());
+ if (!numIOThreads_) {
+ numIOThreads_ = DEFAULT_IO_THREADS;
}
- // Initialize libevent core
- registerEvents(static_cast<event_base*>(event_base_new()), true);
+ for (int id = 0; id < numIOThreads_; ++id) {
+ // the first IO thread also does the listening on server socket
+ int listenFd = (id == 0 ? serverSocket_ : -1);
- // Run the preServe event
+ shared_ptr<TNonblockingIOThread> thread(
+ new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_));
+ ioThreads_.push_back(thread);
+ }
+
+ // Notify handler of the preServe event
if (eventHandler_ != NULL) {
eventHandler_->preServe();
}
- // Run libevent engine, invokes calls to eventHandler
- // Only returns if stop() is called.
- event_base_loop(eventBase_, 0);
+ // Start all of our helper IO threads. Note that the threads run forever,
+ // only terminating if stop() is called.
+ assert(ioThreads_.size() == numIOThreads_);
+ assert(ioThreads_.size() > 0);
+
+ GlobalOutput.printf("TNonblockingServer: Serving on port %d, %d io threads.",
+ port_, ioThreads_.size());
+
+ // Launch all the secondary IO threads in separate threads
+ if (ioThreads_.size() > 1) {
+ ioThreadFactory_.reset(new PosixThreadFactory(
+ PosixThreadFactory::OTHER, // scheduler
+ PosixThreadFactory::NORMAL, // priority
+ 1, // stack size (MB)
+ false // detached
+ ));
+
+ assert(ioThreadFactory_.get());
+
+ // intentionally starting at thread 1, not 0
+ for (int i = 1; i < ioThreads_.size(); ++i) {
+ shared_ptr<Thread> thread = ioThreadFactory_->newThread(ioThreads_[i]);
+ ioThreads_[i]->setThread(thread);
+ thread->start();
+ }
+ }
+
+ // Run the primary (listener) IO thread loop in our main thread; this will
+ // only return when the server is shutting down.
+ ioThreads_[0]->run();
+
+ // Ensure all threads are finished before exiting serve()
+ for (int i = 0; i < ioThreads_.size(); ++i) {
+ ioThreads_[i]->join();
+ GlobalOutput.printf("TNonblocking: join done for IO thread #%d", i);
+ }
}
-void TNonblockingServer::stop() {
- if (!eventBase_) {
- return;
+TNonblockingIOThread::TNonblockingIOThread(TNonblockingServer* server,
+ int number,
+ int listenSocket,
+ bool useHighPriority)
+ : server_(server)
+ , number_(number)
+ , listenSocket_(listenSocket)
+ , useHighPriority_(useHighPriority)
+ , eventBase_(NULL) {
+ notificationPipeFDs_[0] = -1;
+ notificationPipeFDs_[1] = -1;
+}
+
+TNonblockingIOThread::~TNonblockingIOThread() {
+ // make sure our associated thread is fully finished
+ join();
+
+ if (eventBase_) {
+ event_base_free(eventBase_);
}
- // Call event_base_loopbreak() to tell libevent to exit the loop
- //
- // (The libevent documentation doesn't explicitly state that this function is
- // safe to call from another thread. However, all it does is set a variable,
- // in the event_base, so it should be fine.)
+ if (listenSocket_ >= 0) {
+ if (0 != close(listenSocket_)) {
+ GlobalOutput.perror("TNonblockingIOThread listenSocket_ close(): ",
+ errno);
+ }
+ listenSocket_ = TNonblockingServer::INVALID_SOCKET;
+ }
+
+ for (int i = 0; i < 2; ++i) {
+ if (notificationPipeFDs_[i] >= 0) {
+ if (0 != ::close(notificationPipeFDs_[i])) {
+ GlobalOutput.perror("TNonblockingIOThread notificationPipe close(): ",
+ errno);
+ }
+ notificationPipeFDs_[i] = TNonblockingServer::INVALID_SOCKET;
+ }
+ }
+}
+
+void TNonblockingIOThread::createNotificationPipe() {
+ if (pipe(notificationPipeFDs_) != 0) {
+ GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", errno);
+ throw TException("can't create notification pipe");
+ }
+ int flags;
+ if ((flags = fcntl(notificationPipeFDs_[0], F_GETFL, 0)) < 0 ||
+ fcntl(notificationPipeFDs_[0], F_SETFL, flags | O_NONBLOCK) < 0) {
+ close(notificationPipeFDs_[0]);
+ close(notificationPipeFDs_[1]);
+ throw TException("TNonblockingServer::createNotificationPipe() O_NONBLOCK");
+ }
+ for (int i = 0; i < 2; ++i) {
+ if ((flags = fcntl(notificationPipeFDs_[i], F_GETFD, 0)) < 0 ||
+ fcntl(notificationPipeFDs_[i], F_SETFD, flags | FD_CLOEXEC) < 0) {
+ close(notificationPipeFDs_[0]);
+ close(notificationPipeFDs_[1]);
+ throw TException("TNonblockingServer::createNotificationPipe() "
+ "FD_CLOEXEC");
+ }
+ }
+}
+
+/**
+ * Register the core libevent events onto the proper base.
+ */
+void TNonblockingIOThread::registerEvents() {
+ if (listenSocket_ >= 0) {
+ // Register the server event
+ event_set(&serverEvent_,
+ listenSocket_,
+ EV_READ | EV_PERSIST,
+ TNonblockingIOThread::listenHandler,
+ server_);
+ event_base_set(eventBase_, &serverEvent_);
+
+ // Add the event and start up the server
+ if (-1 == event_add(&serverEvent_, 0)) {
+ throw TException("TNonblockingServer::serve(): "
+ "event_add() failed on server listen event");
+ }
+ GlobalOutput.printf("TNonblocking: IO thread #%d registered for listen.",
+ number_);
+ }
+
+ createNotificationPipe();
+
+ // Create an event to be notified when a task finishes
+ event_set(¬ificationEvent_,
+ getNotificationRecvFD(),
+ EV_READ | EV_PERSIST,
+ TNonblockingIOThread::notifyHandler,
+ this);
+
+ // Attach to the base
+ event_base_set(eventBase_, ¬ificationEvent_);
+
+ // Add the event and start up the server
+ if (-1 == event_add(¬ificationEvent_, 0)) {
+ throw TException("TNonblockingServer::serve(): "
+ "event_add() failed on task-done notification event");
+ }
+ GlobalOutput.printf("TNonblocking: IO thread #%d registered for notify.",
+ number_);
+}
+
+bool TNonblockingIOThread::notify(TNonblockingServer::TConnection* conn) {
+ int fd = getNotificationSendFD();
+ if (fd < 0) {
+ return false;
+ }
+
+ const int kSize = sizeof(conn);
+ if (write(fd, &conn, kSize) != kSize) {
+ return false;
+ }
+
+ return true;
+}
+
+/* static */
+void TNonblockingIOThread::notifyHandler(int fd, short which, void* v) {
+ TNonblockingIOThread* ioThread = (TNonblockingIOThread*) v;
+ assert(ioThread);
+
+ while (true) {
+ TNonblockingServer::TConnection* connection = 0;
+ const int kSize = sizeof(connection);
+ int nBytes = read(fd, &connection, kSize);
+ if (nBytes == kSize) {
+ if (connection == NULL) {
+ // this is the command to stop our thread, exit the handler!
+ return;
+ }
+ connection->transition();
+ } else if (nBytes > 0) {
+ // throw away these bytes and hope that next time we get a solid read
+ GlobalOutput.printf("notifyHandler: Bad read of %d bytes, wanted %d",
+ nBytes, kSize);
+ ioThread->breakLoop(true);
+ return;
+ } else if (nBytes == 0) {
+ GlobalOutput.printf("notifyHandler: Notify socket closed!");
+ // exit the loop
+ break;
+ } else { // nBytes < 0
+ if (errno != EWOULDBLOCK && errno != EAGAIN) {
+ GlobalOutput.perror(
+ "TNonblocking: notifyHandler read() failed: ", errno);
+ ioThread->breakLoop(true);
+ return;
+ }
+ // exit the loop
+ break;
+ }
+ }
+}
+
+void TNonblockingIOThread::breakLoop(bool error) {
+ if (error) {
+ GlobalOutput.printf(
+ "TNonblockingServer: IO thread #%d exiting with error.", number_);
+ // TODO: figure out something better to do here, but for now kill the
+ // whole process.
+ GlobalOutput.printf("TNonblockingServer: aborting process.");
+ ::abort();
+ }
+
+ // sets a flag so that the loop exits on the next event
event_base_loopbreak(eventBase_);
- // event_base_loopbreak() only causes the loop to exit the next time it wakes
- // up. We need to force it to wake up, in case there are no real events
- // it needs to process.
+ // event_base_loopbreak() only causes the loop to exit the next time
+ // it wakes up. We need to force it to wake up, in case there are
+ // no real events it needs to process.
//
- // Attempt to connect to the server socket. If anything fails,
- // we'll just have to wait until libevent wakes up on its own.
- //
- // First create a socket
- int fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
- if (fd < 0) {
- return;
+ // If we're running in the same thread, we can't use the notify(0)
+ // mechanism to stop the thread, but happily if we're running in the
+ // same thread, this means the thread can't be blocking in the event
+ // loop either.
+ if (!pthread_equal(pthread_self(), threadId_)) {
+ notify(NULL);
+ }
+}
+
+void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) {
+ // Start out with a standard, low-priority setup for the sched params.
+ struct sched_param sp;
+ bzero((void*) &sp, sizeof(sp));
+ int policy = SCHED_OTHER;
+
+ // If desired, set up high-priority sched params structure.
+ if (value) {
+ // FIFO scheduler, ranked above default SCHED_OTHER queue
+ policy = SCHED_FIFO;
+ // The priority only compares us to other SCHED_FIFO threads, so we
+ // just pick a random priority halfway between min & max.
+ const int priority = (sched_get_priority_max(policy) +
+ sched_get_priority_min(policy)) / 2;
+
+ sp.sched_priority = priority;
}
- // Set up the address
- struct sockaddr_in addr;
- addr.sin_family = AF_INET;
- addr.sin_addr.s_addr = htonl(0x7f000001); // 127.0.0.1
- addr.sin_port = htons(port_);
+ // Actually set the sched params for the current thread.
+ if (0 == pthread_setschedparam(pthread_self(), policy, &sp)) {
+ GlobalOutput.printf(
+ "TNonblocking: IO Thread #%d using high-priority scheduler!", number_);
+ } else {
+ GlobalOutput.perror("TNonblocking: pthread_setschedparam(): ", errno);
+ }
+}
- // Finally do the connect().
- // We don't care about the return value;
- // we're just going to close the socket either way.
- connect(fd, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr));
- close(fd);
+void TNonblockingIOThread::run() {
+ threadId_ = pthread_self();
+
+ assert(eventBase_ == 0);
+ eventBase_ = event_base_new();
+
+ // Print some libevent stats
+ if (number_ == 0) {
+ GlobalOutput.printf("TNonblockingServer: using libevent %s method %s",
+ event_get_version(),
+ event_base_get_method(eventBase_));
+ }
+
+
+ registerEvents();
+
+ GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...",
+ number_);
+
+ if (useHighPriority_) {
+ setCurrentThreadHighPriority(true);
+ }
+
+ // Run libevent engine, never returns, invokes calls to eventHandler
+ event_base_loop(eventBase_, 0);
+
+ if (useHighPriority_) {
+ setCurrentThreadHighPriority(false);
+ }
+
+ // cleans up our registered events
+ cleanupEvents();
+
+ GlobalOutput.printf("TNonblockingServer: IO thread #%d run() done!",
+ number_);
+}
+
+void TNonblockingIOThread::cleanupEvents() {
+ // stop the listen socket, if any
+ if (listenSocket_ >= 0) {
+ if (event_del(&serverEvent_) == -1) {
+ GlobalOutput.perror("TNonblockingIOThread::stop() event_del: ", errno);
+ }
+ }
+
+ event_del(¬ificationEvent_);
+}
+
+
+void TNonblockingIOThread::stop() {
+ // This should cause the thread to fall out of its event loop ASAP.
+ breakLoop(false);
+}
+
+void TNonblockingIOThread::join() {
+ // If this was a thread created by a factory (not the thread that called
+ // serve()), we join() it to make sure we shut down fully.
+ if (thread_) {
+ try {
+ // Note that it is safe to both join() ourselves twice, as well as join
+ // the current thread as the pthread implementation checks for deadlock.
+ thread_->join();
+ } catch(...) {
+ // swallow everything
+ }
+ }
}
}}} // apache::thrift::server
diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h
index da36045..84e384c 100644
--- a/lib/cpp/src/server/TNonblockingServer.h
+++ b/lib/cpp/src/server/TNonblockingServer.h
@@ -26,7 +26,11 @@
#include <transport/TSocket.h>
#include <concurrency/ThreadManager.h>
#include <climits>
+#include <concurrency/Thread.h>
+#include <concurrency/PosixThreadFactory.h>
+#include <concurrency/Mutex.h>
#include <stack>
+#include <vector>
#include <string>
#include <errno.h>
#include <cstdlib>
@@ -35,6 +39,8 @@
#endif
#include <event.h>
+
+
namespace apache { namespace thrift { namespace server {
using apache::thrift::transport::TMemoryBuffer;
@@ -42,6 +48,11 @@
using apache::thrift::protocol::TProtocol;
using apache::thrift::concurrency::Runnable;
using apache::thrift::concurrency::ThreadManager;
+using apache::thrift::concurrency::PosixThreadFactory;
+using apache::thrift::concurrency::ThreadFactory;
+using apache::thrift::concurrency::Thread;
+using apache::thrift::concurrency::Mutex;
+using apache::thrift::concurrency::Guard;
#ifdef LIBEVENT_VERSION_NUMBER
#define LIBEVENT_VERSION_MAJOR (LIBEVENT_VERSION_NUMBER >> 24)
@@ -78,9 +89,10 @@
}
/**
- * This is a non-blocking server in C++ for high performance that operates a
- * single IO thread. It assumes that all incoming requests are framed with a
- * 4 byte length indicator and writes out responses using the same framing.
+ * This is a non-blocking server in C++ for high performance that
+ * operates a set of IO threads (by default only one). It assumes that
+ * all incoming requests are framed with a 4 byte length indicator and
+ * writes out responses using the same framing.
*
* It does not use the TServerTransport framework, but rather has socket
* operations hardcoded for use with select.
@@ -95,10 +107,14 @@
T_OVERLOAD_DRAIN_TASK_QUEUE ///< Drop some tasks from head of task queue */
};
+class TNonblockingIOThread;
+
class TNonblockingServer : public TServer {
private:
class TConnection;
+ friend class TNonblockingIOThread;
+ private:
/// Listen backlog
static const int LISTEN_BACKLOG = 1024;
@@ -123,6 +139,18 @@
/// # of calls before resizing oversized buffers (0 = check only on close)
static const int RESIZE_BUFFER_EVERY_N = 512;
+ /// # of IO threads to use by default
+ static const int DEFAULT_IO_THREADS = 1;
+
+ /// File descriptor of an invalid socket
+ static const int INVALID_SOCKET = -1;
+
+ /// # of IO threads this server will use
+ size_t numIOThreads_;
+
+ /// Whether to set high scheduling priority for IO threads
+ bool useHighPriorityIOThreads_;
+
/// Server socket file descriptor
int serverSocket_;
@@ -135,15 +163,17 @@
/// Is thread pool processing?
bool threadPoolProcessing_;
- /// The event base for libevent
- event_base* eventBase_;
- bool ownEventBase_;
+ // Factory to create the IO threads
+ boost::shared_ptr<PosixThreadFactory> ioThreadFactory_;
- /// Event struct, used with eventBase_ for connection events
- struct event serverEvent_;
+ // Vector of IOThread objects that will handle our IO
+ std::vector<boost::shared_ptr<TNonblockingIOThread> > ioThreads_;
- /// Event struct, used with eventBase_ for task completion notification
- struct event notificationEvent_;
+ // Index of next IO Thread to be used (for round-robin)
+ int nextIOThread_;
+
+ // Synchronizes access to connection stack and similar data
+ Mutex connMutex_;
/// Number of TConnection object we've created
size_t numTConnections_;
@@ -211,9 +241,6 @@
/// Count of connections dropped on overload since server started
uint64_t nTotalConnectionsDropped_;
- /// File descriptors for pipe used for task completion notification.
- evutil_socket_t notificationPipeFDs_[2];
-
/**
* This is a stack of all the objects that have been created but that
* are NOT currently in use. When we close a connection, we place it on this
@@ -234,10 +261,11 @@
void init(int port) {
serverSocket_ = -1;
+ numIOThreads_ = DEFAULT_IO_THREADS;
+ nextIOThread_ = 0;
+ useHighPriorityIOThreads_ = false;
port_ = port;
threadPoolProcessing_ = false;
- eventBase_ = NULL;
- ownEventBase_ = false;
numTConnections_ = 0;
numActiveProcessors_ = 0;
connectionStackLimit_ = CONNECTION_STACK_LIMIT;
@@ -360,6 +388,31 @@
}
/**
+ * Sets the number of IO threads used by this server. Can only be used before
+ * the call to serve() and has no effect afterwards. We always use a
+ * PosixThreadFactory for the IO worker threads, because they must joinable
+ * for clean shutdown.
+ */
+ void setNumIOThreads(size_t numThreads) {
+ numIOThreads_ = numThreads;
+ }
+
+ /** Return whether the IO threads will get high scheduling priority */
+ bool useHighPriorityIOThreads() const {
+ return useHighPriorityIOThreads_;
+ }
+
+ /** Set whether the IO threads will get high scheduling priority. */
+ void setUseHighPriorityIOThreads(bool val) {
+ useHighPriorityIOThreads_ = val;
+ }
+
+ /** Return the number of IO threads used by this server. */
+ size_t getNumIOThreads() const {
+ return numIOThreads_;
+ }
+
+ /**
* Get the maximum number of unused TConnection we will hold in reserve.
*
* @return the current limit on TConnection pool size.
@@ -385,20 +438,6 @@
threadManager_->add(task, 0LL, taskExpireTime_);
}
- event_base* getEventBase() const {
- return eventBase_;
- }
-
- /// Increment our count of the number of connected sockets.
- void incrementNumConnections() {
- ++numTConnections_;
- }
-
- /// Decrement our count of the number of connected sockets.
- void decrementNumConnections() {
- --numTConnections_;
- }
-
/**
* Return the count of sockets currently connected to.
*
@@ -431,11 +470,13 @@
/// Increment the count of connections currently processing.
void incrementActiveProcessors() {
+ Guard g(connMutex_);
++numActiveProcessors_;
}
/// Decrement the count of connections currently processing.
void decrementActiveProcessors() {
+ Guard g(connMutex_);
if (numActiveProcessors_ > 0) {
--numActiveProcessors_;
}
@@ -615,7 +656,7 @@
idleReadBufferLimit_ = limit;
}
-
+
/**
* Get the maximum size of write buffer allocated to idle TConnection objects.
@@ -659,21 +700,48 @@
resizeBufferEveryN_ = count;
}
+ /**
+ * Main workhorse function, starts up the server listening on a port and
+ * loops over the libevent handler.
+ */
+ void serve();
+ /**
+ * Causes the server to terminate gracefully (can be called from any thread).
+ */
+ void stop();
+ private:
+ /**
+ * Callback function that the threadmanager calls when a task reaches
+ * its expiration time. It is needed to clean up the expired connection.
+ *
+ * @param task the runnable associated with the expired task.
+ */
+ void expireClose(boost::shared_ptr<Runnable> task);
+
+ /// Creates a socket to listen on and binds it to the local port.
+ void createAndListenOnSocket();
+
+ /**
+ * Takes a socket created by createAndListenOnSocket() and sets various
+ * options on it to prepare for use in the server.
+ *
+ * @param fd descriptor of socket to be initialized/
+ */
+ void listenSocket(int fd);
/**
* Return an initialized connection object. Creates or recovers from
* pool a TConnection and initializes it with the provided socket FD
* and flags.
*
* @param socket FD of socket associated with this connection.
- * @param flags initial lib_event flags for this connection.
* @param addr the sockaddr of the client
* @param addrLen the length of addr
* @return pointer to initialized TConnection object.
*/
- TConnection* createConnection(int socket, short flags,
- const sockaddr* addr, socklen_t addrLen);
+ TConnection* createConnection(int socket, const sockaddr* addr,
+ socklen_t addrLen);
/**
* Returns a connection to pool or deletion. If the connection pool
@@ -683,14 +751,67 @@
* @param connection the TConection being returned.
*/
void returnConnection(TConnection* connection);
+};
+class TNonblockingIOThread : public Runnable {
+ public:
+ // Creates an IO thread and sets up the event base. The listenSocket should
+ // be a valid FD on which listen() has already been called. If the
+ // listenSocket is < 0, accepting will not be done.
+ TNonblockingIOThread(TNonblockingServer* server,
+ int number,
+ int listenSocket,
+ bool useHighPriority);
+
+ ~TNonblockingIOThread();
+
+ // Returns the event-base for this thread.
+ event_base* getEventBase() const { return eventBase_; }
+
+ // Returns the server for this thread.
+ TNonblockingServer* getServer() const { return server_; }
+
+ // Returns the number of this IO thread.
+ int getThreadNumber() const { return number_; }
+
+ // Returns the thread id associated with this object. This should
+ // only be called after the thread has been started.
+ pthread_t getThreadId() const { return threadId_; }
+
+ // Returns the send-fd for task complete notifications.
+ int getNotificationSendFD() const { return notificationPipeFDs_[1]; }
+
+ // Returns the read-fd for task complete notifications.
+ int getNotificationRecvFD() const { return notificationPipeFDs_[0]; }
+
+ // Returns the actual thread object associated with this IO thread.
+ boost::shared_ptr<Thread> getThread() const { return thread_; }
+
+ // Sets the actual thread object associated with this IO thread.
+ void setThread(const boost::shared_ptr<Thread>& t) { thread_ = t; }
+
+ // Used by TConnection objects to indicate processing has finished.
+ bool notify(TNonblockingServer::TConnection* conn);
+
+ // Enters the event loop and does not return until a call to stop().
+ virtual void run();
+
+ // Exits the event loop as soon as possible.
+ void stop();
+
+ // Ensures that the event-loop thread is fully finished and shut down.
+ void join();
+
+ private:
/**
- * Callback function that the threadmanager calls when a task reaches
- * its expiration time. It is needed to clean up the expired connection.
+ * C-callable event handler for signaling task completion. Provides a
+ * callback that libevent can understand that will read a connection
+ * object's address from a pipe and call connection->transition() for
+ * that object.
*
- * @param task the runnable associated with the expired task.
+ * @param fd the descriptor the event occurred on.
*/
- void expireClose(boost::shared_ptr<Runnable> task);
+ static void notifyHandler(int fd, short which, void* v);
/**
* C-callable event handler for listener events. Provides a callback
@@ -700,63 +821,57 @@
* @param which the flags associated with the event.
* @param v void* callback arg where we placed TNonblockingServer's "this".
*/
- static void eventHandler(evutil_socket_t fd, short which, void* v) {
+ static void listenHandler(evutil_socket_t fd, short which, void* v) {
((TNonblockingServer*)v)->handleEvent(fd, which);
}
- /// Creates a socket to listen on and binds it to the local port.
- void listenSocket();
+ /// Exits the loop ASAP in case of shutdown or error.
+ void breakLoop(bool error);
- /**
- * Takes a socket created by listenSocket() and sets various options on it
- * to prepare for use in the server.
- *
- * @param fd descriptor of socket to be initialized/
- */
- void listenSocket(int fd);
+ /// Registers the events for the notification & listen sockets
+ void registerEvents();
/// Create the pipe used to notify I/O process of task completion.
void createNotificationPipe();
- /**
- * Get notification pipe send descriptor.
- *
- * @return write fd for pipe.
- */
- evutil_socket_t getNotificationSendFD() const {
- return notificationPipeFDs_[1];
- }
+ /// Unregisters our events for notification and listen sockets.
+ void cleanupEvents();
- /**
- * Get notification pipe receive descriptor.
- *
- * @return read fd of pipe.
- */
- evutil_socket_t getNotificationRecvFD() const {
- return notificationPipeFDs_[0];
- }
+ /// Sets (or clears) high priority scheduling status for the current thread.
+ void setCurrentThreadHighPriority(bool value);
- /**
- * Register the core libevent events onto the proper base.
- *
- * @param base pointer to the event base to be initialized.
- * @param ownEventBase if true, this server is responsible for
- * freeing the event base memory.
- */
- void registerEvents(event_base* base, bool ownEventBase = true);
+ private:
+ /// associated server
+ TNonblockingServer* server_;
- /**
- * Main workhorse function, starts up the server listening on a port and
- * loops over the libevent handler.
- */
- void serve();
+ /// thread number (for debugging).
+ const int number_;
- /**
- * May be called from a separate thread to cause serve() to return.
- */
- void stop();
+ /// The actual physical thread id.
+ pthread_t threadId_;
+
+ /// If listenSocket_ >= 0, adds an event on the event_base to accept conns
+ int listenSocket_;
+
+ /// Sets a high scheduling priority when running
+ bool useHighPriority_;
+
+ /// pointer to eventbase to be used for looping
+ event_base* eventBase_;
+
+ /// Used with eventBase_ for connection events (only in listener thread)
+ struct event serverEvent_;
+
+ /// Used with eventBase_ for task completion notification
+ struct event notificationEvent_;
+
+ /// File descriptors for pipe used for task completion notification.
+ int notificationPipeFDs_[2];
+
+ /// Actual IO Thread
+ boost::shared_ptr<Thread> thread_;
};
}}} // apache::thrift::server
-#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_
+#endif // #ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_