Add thread pool option to NonblockingServer
Summary: If you want requests processed outside of the I/O thread
Reviewed By: jake luciani, aditya
Test Plan: nb-main.cpp, in the test folder
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665132 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp
index 6337806..5c4dc8c 100644
--- a/lib/cpp/src/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/server/TNonblockingServer.cpp
@@ -6,6 +6,7 @@
#include "TNonblockingServer.h"
+#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
@@ -17,6 +18,50 @@
using namespace facebook::thrift::protocol;
using namespace facebook::thrift::transport;
+using namespace std;
+
+class TConnection::Task: public Runnable {
+ public:
+ Task(boost::shared_ptr<TProcessor> processor,
+ boost::shared_ptr<TProtocol> input,
+ boost::shared_ptr<TProtocol> output,
+ int taskHandle) :
+ processor_(processor),
+ input_(input),
+ output_(output),
+ taskHandle_(taskHandle) {}
+
+ void run() {
+ try {
+ while (processor_->process(input_, output_)) {
+ if (!input_->getTransport()->peek()) {
+ break;
+ }
+ }
+ } catch (TTransportException& ttx) {
+ cerr << "TThreadedServer client died: " << ttx.what() << endl;
+ } catch (TException& x) {
+ cerr << "TThreadedServer exception: " << x.what() << endl;
+ } catch (...) {
+ cerr << "TThreadedServer uncaught exception." << endl;
+ }
+
+ // Signal completion back to the libevent thread via a socketpair
+ int8_t b = 0;
+ if (-1 == send(taskHandle_, &b, sizeof(int8_t), 0)) {
+ GlobalOutput("TNonblockingServer::Task: send");
+ }
+ if (-1 == ::close(taskHandle_)) {
+ GlobalOutput("TNonblockingServer::Task: close, possible resource leak");
+ }
+ }
+
+ private:
+ boost::shared_ptr<TProcessor> processor_;
+ boost::shared_ptr<TProtocol> input_;
+ boost::shared_ptr<TProtocol> output_;
+ int taskHandle_;
+};
void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) {
socket_ = socket;
@@ -34,6 +79,8 @@
socketState_ = SOCKET_RECV;
appState_ = APP_INIT;
+ taskHandle_ = -1;
+
// Set flags, which also registers the event
setFlags(eventFlags);
@@ -168,24 +215,59 @@
// and get back some data from the dispatch function
inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
outputTransport_->resetBuffer();
+
+ if (server_->isThreadPoolProcessing()) {
+ // We are setting up a Task to do this work and we will wait on it
+ int sv[2];
+ if (-1 == socketpair(AF_LOCAL, SOCK_STREAM, 0, sv)) {
+ GlobalOutput("TConnection::socketpair() failed");
+ // Now we will fall through to the APP_WAIT_TASK block with no response
+ } else {
+ // Create task and dispatch to the thread manager
+ boost::shared_ptr<Runnable> task =
+ boost::shared_ptr<Runnable>(new Task(server_->getProcessor(),
+ inputProtocol_,
+ outputProtocol_,
+ sv[1]));
+ appState_ = APP_WAIT_TASK;
+ event_set(&taskEvent_,
+ taskHandle_ = sv[0],
+ EV_READ,
+ TConnection::taskHandler,
+ this);
- try {
- // Invoke the processor
- server_->getProcessor()->process(inputProtocol_, outputProtocol_);
- } catch (TTransportException &ttx) {
- fprintf(stderr, "TTransportException: Server::process() %s\n", ttx.what());
- close();
- return;
- } catch (TException &x) {
- fprintf(stderr, "TException: Server::process() %s\n", x.what());
- close();
- return;
- } catch (...) {
- fprintf(stderr, "Server::process() unknown exception\n");
- close();
- return;
+ // Add the event and start up the server
+ if (-1 == event_add(&taskEvent_, 0)) {
+ GlobalOutput("TNonblockingServer::serve(): coult not event_add");
+ return;
+ }
+ server_->addTask(task);
+ return;
+ }
+ } else {
+ try {
+ // Invoke the processor
+ server_->getProcessor()->process(inputProtocol_, outputProtocol_);
+ } catch (TTransportException &ttx) {
+ fprintf(stderr, "TTransportException: Server::process() %s\n", ttx.what());
+ close();
+ return;
+ } catch (TException &x) {
+ fprintf(stderr, "TException: Server::process() %s\n", x.what());
+ close();
+ return;
+ } catch (...) {
+ fprintf(stderr, "Server::process() unknown exception\n");
+ close();
+ return;
+ }
}
+ case APP_WAIT_TASK:
+ // We have now finished processing a task and the result has been written
+ // into the outputTransport_, so we grab its contents and place them into
+ // the writeBuffer_ for actual writing by the libevent thread
+
// Get the result of the operation
outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
@@ -212,7 +294,7 @@
setWrite();
// Try to work the socket immediately
- workSocket();
+ // workSocket();
return;
}
@@ -232,7 +314,7 @@
appState_ = APP_SEND_RESULT;
// Go to work on the socket right away, probably still writeable
- workSocket();
+ // workSocket();
return;
@@ -260,7 +342,7 @@
setRead();
// Try to work the socket right away
- workSocket();
+ // workSocket();
return;
@@ -283,7 +365,7 @@
appState_ = APP_READ_REQUEST;
// Work the socket right away
- workSocket();
+ // workSocket();
return;
@@ -315,13 +397,13 @@
*
* Prepares the event structure &event to be used in future calls to
* event_add() and event_del(). The event will be prepared to call the
- * event_handler using the 'sock' file descriptor to monitor events.
+ * eventHandler using the 'sock' file descriptor to monitor events.
*
* The events can be either EV_READ, EV_WRITE, or both, indicating
* that an application can read or write from the file respectively without
* blocking.
*
- * The event_handler will be called with the file descriptor that triggered
+ * The eventHandler will be called with the file descriptor that triggered
* the event and the type of event which will be one of: EV_TIMEOUT,
* EV_SIGNAL, EV_READ, EV_WRITE.
*
@@ -330,7 +412,7 @@
*
* Once initialized, the &event struct can be used repeatedly with
* event_add() and event_del() and does not need to be reinitialized unless
- * the event_handler and/or the argument to it are to be changed. However,
+ * the eventHandler and/or the argument to it are to be changed. However,
* when an ev structure has been added to libevent using event_add() the
* structure must persist until the event occurs (assuming EV_PERSIST
* is not set) or is removed using event_del(). You may not reuse the same
@@ -516,12 +598,12 @@
this);
// Add the event and start up the server
- if (event_add(&serverEvent, 0) == -1) {
+ if (-1 == event_add(&serverEvent, 0)) {
GlobalOutput("TNonblockingServer::serve(): coult not event_add");
return;
}
- // Run libevent engine, never returns, invokes calls to event_handler
+ // Run libevent engine, never returns, invokes calls to eventHandler
event_loop(0);
}
diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h
index e4f8346..0ce7ccb 100644
--- a/lib/cpp/src/server/TNonblockingServer.h
+++ b/lib/cpp/src/server/TNonblockingServer.h
@@ -10,6 +10,7 @@
#include <Thrift.h>
#include <server/TServer.h>
#include <transport/TTransportUtils.h>
+#include <concurrency/ThreadManager.h>
#include <stack>
#include <event.h>
@@ -17,6 +18,8 @@
using facebook::thrift::transport::TMemoryBuffer;
using facebook::thrift::protocol::TProtocol;
+using facebook::thrift::concurrency::Runnable;
+using facebook::thrift::concurrency::ThreadManager;
// Forward declaration of class
class TConnection;
@@ -46,6 +49,12 @@
// Whether to frame responses
bool frameResponses_;
+ // For processing via thread pool, may be NULL
+ boost::shared_ptr<ThreadManager> threadManager_;
+
+ // Is thread pool processing?
+ bool threadPoolProcessing_;
+
/**
* This is a stack of all the objects that have been created but that
* are NOT currently in use. When we close a connection, we place it on this
@@ -66,15 +75,18 @@
TNonblockingServer(boost::shared_ptr<TProcessor> processor,
boost::shared_ptr<TProtocolFactory> protocolFactory,
- int port) :
+ int port,
+ boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
TServer(processor),
serverSocket_(0),
port_(port),
- frameResponses_(true) {
+ frameResponses_(true),
+ threadManager_(threadManager) {
setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
setInputProtocolFactory(protocolFactory);
setOutputProtocolFactory(protocolFactory);
+ setThreadManager(threadManager);
}
TNonblockingServer(boost::shared_ptr<TProcessor> processor,
@@ -82,19 +94,35 @@
boost::shared_ptr<TTransportFactory> outputTransportFactory,
boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
- int port) :
+ int port,
+ boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
TServer(processor),
serverSocket_(0),
port_(port),
- frameResponses_(true) {
+ frameResponses_(true),
+ threadManager_(threadManager) {
setInputTransportFactory(inputTransportFactory);
setOutputTransportFactory(outputTransportFactory);
setInputProtocolFactory(inputProtocolFactory);
setOutputProtocolFactory(outputProtocolFactory);
+ setThreadManager(threadManager);
}
~TNonblockingServer() {}
+ void setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
+ threadManager_ = threadManager;
+ threadPoolProcessing_ = (threadManager != NULL);
+ }
+
+ bool isThreadPoolProcessing() {
+ return threadPoolProcessing_;
+ }
+
+ void addTask(boost::shared_ptr<Runnable> task) {
+ threadManager_->add(task);
+ }
+
void setFrameResponses(bool frameResponses) {
frameResponses_ = frameResponses;
}
@@ -133,6 +161,7 @@
APP_INIT,
APP_READ_FRAME_SIZE,
APP_READ_REQUEST,
+ APP_WAIT_TASK,
APP_SEND_FRAME_SIZE,
APP_SEND_RESULT
};
@@ -144,6 +173,8 @@
class TConnection {
private:
+ class Task;
+
// Server handle
TNonblockingServer* server_;
@@ -186,6 +217,12 @@
// Frame size
int32_t frameSize_;
+ // Task handle
+ int taskHandle_;
+
+ // Task event
+ struct event taskEvent_;
+
// Transport to read from
boost::shared_ptr<TMemoryBuffer> inputTransport_;
@@ -248,9 +285,19 @@
// Handler wrapper
static void eventHandler(int fd, short which, void* v) {
- assert(fd = ((TConnection*)v)->socket_);
+ assert(fd == ((TConnection*)v)->socket_);
((TConnection*)v)->workSocket();
}
+
+ // Handler wrapper for task block
+ static void taskHandler(int fd, short which, void* v) {
+ assert(fd == ((TConnection*)v)->taskHandle_);
+ if (-1 == ::close(((TConnection*)v)->taskHandle_)) {
+ GlobalOutput("TConnection::taskHandler close handle failed, resource leak");
+ }
+ ((TConnection*)v)->transition();
+ }
+
};
}}} // facebook::thrift::server
diff --git a/lib/cpp/src/transport/TServerSocket.cpp b/lib/cpp/src/transport/TServerSocket.cpp
index 0d25bf0..2f6ce5d 100644
--- a/lib/cpp/src/transport/TServerSocket.cpp
+++ b/lib/cpp/src/transport/TServerSocket.cpp
@@ -68,8 +68,8 @@
intSock1_ = -1;
intSock2_ = -1;
} else {
- intSock1_ = sv[0];
- intSock2_ = sv[1];
+ intSock1_ = sv[1];
+ intSock2_ = sv[0];
}
serverSocket_ = socket(AF_INET, SOCK_STREAM, 0);