cpp: TNonBlockingServer overload handling and optimizations
- Establish a mechanism for TNonBlockingServer to handle overloads by
limiting the number of connections accepted or in-process.
- Provide a framework for further work in handling server overloads.
- Limit memory consumption of connection object pool.
- Drop connections when overloaded.
- Add overload-handling behavior allowing pending tasks to be dropped
from the front of the task queue (short of the ability to terminate
running tasks, these are the oldest tasks in the system and thus the
most likely to be beyond their freshness date). This reduces the
chance of spending valuable CPU time processing a request that the
client has already timed out.
- Uses a single persistent pipe() to communicate task completion instead
of constructing and monitoring a new socketpair() for every task in
the system.
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@920664 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp
index 5375387..a9c1c15 100644
--- a/lib/cpp/src/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/server/TNonblockingServer.cpp
@@ -45,11 +45,11 @@
Task(boost::shared_ptr<TProcessor> processor,
boost::shared_ptr<TProtocol> input,
boost::shared_ptr<TProtocol> output,
- int taskHandle) :
+ TConnection* connection) :
processor_(processor),
input_(input),
output_(output),
- taskHandle_(taskHandle) {}
+ connection_(connection) {}
void run() {
try {
@@ -66,21 +66,21 @@
cerr << "TNonblockingServer uncaught exception." << endl;
}
- // Signal completion back to the libevent thread via a socketpair
- int8_t b = 0;
- if (-1 == send(taskHandle_, &b, sizeof(int8_t), 0)) {
- GlobalOutput.perror("TNonblockingServer::Task: send ", errno);
+ // Signal completion back to the libevent thread via a pipe
+ if (!connection_->notifyServer()) {
+ throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
}
- if (-1 == ::close(taskHandle_)) {
- GlobalOutput.perror("TNonblockingServer::Task: close, possible resource leak ", errno);
- }
+ }
+
+ TConnection* getTConnection() {
+ return connection_;
}
private:
boost::shared_ptr<TProcessor> processor_;
boost::shared_ptr<TProtocol> input_;
boost::shared_ptr<TProtocol> output_;
- int taskHandle_;
+ TConnection* connection_;
};
void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) {
@@ -99,8 +99,6 @@
socketState_ = SOCKET_RECV;
appState_ = APP_INIT;
- taskHandle_ = -1;
-
// Set flags, which also registers the event
setFlags(eventFlags);
@@ -256,37 +254,20 @@
outputTransport_->getWritePtr(4);
outputTransport_->wroteBytes(4);
+ server_->incrementActiveProcessors();
+
if (server_->isThreadPoolProcessing()) {
// We are setting up a Task to do this work and we will wait on it
- int sv[2];
- if (-1 == socketpair(AF_LOCAL, SOCK_STREAM, 0, sv)) {
- GlobalOutput.perror("TConnection::socketpair() failed ", errno);
- // Now we will fall through to the APP_WAIT_TASK block with no response
- } else {
- // Create task and dispatch to the thread manager
- boost::shared_ptr<Runnable> task =
- boost::shared_ptr<Runnable>(new Task(server_->getProcessor(),
- inputProtocol_,
- outputProtocol_,
- sv[1]));
- // The application is now waiting on the task to finish
- appState_ = APP_WAIT_TASK;
- // Create an event to be notified when the task finishes
- event_set(&taskEvent_,
- taskHandle_ = sv[0],
- EV_READ,
- TConnection::taskHandler,
- this);
+ // Create task and dispatch to the thread manager
+ boost::shared_ptr<Runnable> task =
+ boost::shared_ptr<Runnable>(new Task(server_->getProcessor(),
+ inputProtocol_,
+ outputProtocol_,
+ this));
+ // The application is now waiting on the task to finish
+ appState_ = APP_WAIT_TASK;
- // Attach to the base
- event_base_set(server_->getEventBase(), &taskEvent_);
-
- // Add the event and start up the server
- if (-1 == event_add(&taskEvent_, 0)) {
- GlobalOutput("TNonblockingServer::serve(): coult not event_add");
- return;
- }
try {
server_->addTask(task);
} catch (IllegalStateException & ise) {
@@ -295,26 +276,28 @@
close();
}
- // Set this connection idle so that libevent doesn't process more
- // data on it while we're still waiting for the threadmanager to
- // finish this task
- setIdle();
- return;
- }
+ // Set this connection idle so that libevent doesn't process more
+ // data on it while we're still waiting for the threadmanager to
+ // finish this task
+ setIdle();
+ return;
} else {
try {
// Invoke the processor
server_->getProcessor()->process(inputProtocol_, outputProtocol_);
} catch (TTransportException &ttx) {
GlobalOutput.printf("TTransportException: Server::process() %s", ttx.what());
+ server_->decrementActiveProcessors();
close();
return;
} catch (TException &x) {
GlobalOutput.printf("TException: Server::process() %s", x.what());
+ server_->decrementActiveProcessors();
close();
return;
} catch (...) {
GlobalOutput.printf("Server::process() unknown exception");
+ server_->decrementActiveProcessors();
close();
return;
}
@@ -328,6 +311,7 @@
// into the outputTransport_, so we grab its contents and place them into
// the writeBuffer_ for actual writing by the libevent thread
+ server_->decrementActiveProcessors();
// Get the result of the operation
outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
@@ -425,6 +409,11 @@
return;
+ case APP_CLOSE_CONNECTION:
+ server_->decrementActiveProcessors();
+ close();
+ return;
+
default:
GlobalOutput.printf("Unexpected Application State %d", appState_);
assert(0);
@@ -453,7 +442,7 @@
return;
}
- /**
+ /*
* event_set:
*
* Prepares the event structure &event to be used in future calls to
@@ -499,10 +488,10 @@
}
// Close the socket
- if (socket_ > 0) {
+ if (socket_ >= 0) {
::close(socket_);
}
- socket_ = 0;
+ socket_ = -1;
// close any factory produced transports
factoryInputTransport_->close();
@@ -512,7 +501,7 @@
server_->returnConnection(this);
}
-void TConnection::checkIdleBufferMemLimit(uint32_t limit) {
+void TConnection::checkIdleBufferMemLimit(size_t limit) {
if (readBufferSize_ > limit) {
readBufferSize_ = limit;
readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);
@@ -572,7 +561,21 @@
// one, this helps us to avoid having to go back into the libevent engine so
// many times
while ((clientSocket = accept(fd, &addr, &addrLen)) != -1) {
-
+ // If we're overloaded, take action here
+ if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
+ nConnectionsDropped_++;
+ nTotalConnectionsDropped_++;
+ if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
+ close(clientSocket);
+ continue;
+ } else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
+ if (!drainPendingTask()) {
+ // Nothing left to discard, so we drop connection instead.
+ close(clientSocket);
+ continue;
+ }
+ }
+ }
// Explicitly set this socket to NONBLOCK mode
int flags;
if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
@@ -707,6 +710,13 @@
serverSocket_ = s;
}
+void TNonblockingServer::createNotificationPipe() {
+ if (pipe(notificationPipeFDs_) != 0) {
+ GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", errno);
+ throw TException("can't create notification pipe");
+ }
+}
+
/**
* Register the core libevent events onto the proper base.
*/
@@ -732,6 +742,59 @@
if (-1 == event_add(&serverEvent_, 0)) {
throw TException("TNonblockingServer::serve(): coult not event_add");
}
+ if (threadPoolProcessing_) {
+ // Create an event to be notified when a task finishes
+ event_set(¬ificationEvent_,
+ getNotificationRecvFD(),
+ EV_READ | EV_PERSIST,
+ TConnection::taskHandler,
+ this);
+
+ // Attach to the base
+ event_base_set(eventBase_, ¬ificationEvent_);
+
+ // Add the event and start up the server
+ if (-1 == event_add(¬ificationEvent_, 0)) {
+ throw TException("TNonblockingServer::serve(): notification event_add fail");
+ }
+ }
+}
+
+bool TNonblockingServer::serverOverloaded() {
+ size_t activeConnections = numTConnections_ - connectionStack_.size();
+ if (numActiveProcessors_ > maxActiveProcessors_ ||
+ activeConnections > maxConnections_) {
+ if (!overloaded_) {
+ GlobalOutput.printf("thrift non-blocking server overload condition");
+ overloaded_ = true;
+ }
+ } else {
+ if (overloaded_ &&
+ (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_) &&
+ (activeConnections <= overloadHysteresis_ * maxConnections_)) {
+ GlobalOutput.printf("thrift non-blocking server overload ended; %u dropped (%llu total)",
+ nConnectionsDropped_, nTotalConnectionsDropped_);
+ nConnectionsDropped_ = 0;
+ overloaded_ = false;
+ }
+ }
+
+ return overloaded_;
+}
+
+bool TNonblockingServer::drainPendingTask() {
+ if (threadManager_) {
+ boost::shared_ptr<Runnable> task = threadManager_->removeNextPending();
+ if (task) {
+ TConnection* connection =
+ static_cast<TConnection::Task*>(task.get())->getTConnection();
+ assert(connection && connection->getServer()
+ && connection->getState() == APP_WAIT_TASK);
+ connection->forceClose();
+ return true;
+ }
+ }
+ return false;
}
/**
@@ -742,6 +805,11 @@
// Init socket
listenSocket();
+ if (threadPoolProcessing_) {
+ // Init task completion notification pipe
+ createNotificationPipe();
+ }
+
// Initialize libevent core
registerEvents(static_cast<event_base*>(event_init()));