Add thread pool option to NonblockingServer
Summary: If you want requests processed outside of the I/O thread
Reviewed By: jake luciani, aditya
Test Plan: nb-main.cpp, in the test folder
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665132 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h
index e4f8346..0ce7ccb 100644
--- a/lib/cpp/src/server/TNonblockingServer.h
+++ b/lib/cpp/src/server/TNonblockingServer.h
@@ -10,6 +10,7 @@
#include <Thrift.h>
#include <server/TServer.h>
#include <transport/TTransportUtils.h>
+#include <concurrency/ThreadManager.h>
#include <stack>
#include <event.h>
@@ -17,6 +18,8 @@
using facebook::thrift::transport::TMemoryBuffer;
using facebook::thrift::protocol::TProtocol;
+using facebook::thrift::concurrency::Runnable;
+using facebook::thrift::concurrency::ThreadManager;
// Forward declaration of class
class TConnection;
@@ -46,6 +49,12 @@
// Whether to frame responses
bool frameResponses_;
+ // For processing via thread pool, may be NULL
+ boost::shared_ptr<ThreadManager> threadManager_;
+
+ // Is thread pool processing?
+ bool threadPoolProcessing_;
+
/**
* This is a stack of all the objects that have been created but that
* are NOT currently in use. When we close a connection, we place it on this
@@ -66,15 +75,18 @@
TNonblockingServer(boost::shared_ptr<TProcessor> processor,
boost::shared_ptr<TProtocolFactory> protocolFactory,
- int port) :
+ int port,
+ boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
TServer(processor),
serverSocket_(0),
port_(port),
- frameResponses_(true) {
+ frameResponses_(true),
+ threadManager_(threadManager) {
setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
setInputProtocolFactory(protocolFactory);
setOutputProtocolFactory(protocolFactory);
+ setThreadManager(threadManager);
}
TNonblockingServer(boost::shared_ptr<TProcessor> processor,
@@ -82,19 +94,35 @@
boost::shared_ptr<TTransportFactory> outputTransportFactory,
boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
- int port) :
+ int port,
+ boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
TServer(processor),
serverSocket_(0),
port_(port),
- frameResponses_(true) {
+ frameResponses_(true),
+ threadManager_(threadManager) {
setInputTransportFactory(inputTransportFactory);
setOutputTransportFactory(outputTransportFactory);
setInputProtocolFactory(inputProtocolFactory);
setOutputProtocolFactory(outputProtocolFactory);
+ setThreadManager(threadManager);
}
~TNonblockingServer() {}
+ void setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
+ threadManager_ = threadManager;
+ threadPoolProcessing_ = (threadManager != NULL);
+ }
+
+ bool isThreadPoolProcessing() {
+ return threadPoolProcessing_;
+ }
+
+ void addTask(boost::shared_ptr<Runnable> task) {
+ threadManager_->add(task);
+ }
+
void setFrameResponses(bool frameResponses) {
frameResponses_ = frameResponses;
}
@@ -133,6 +161,7 @@
APP_INIT,
APP_READ_FRAME_SIZE,
APP_READ_REQUEST,
+ APP_WAIT_TASK,
APP_SEND_FRAME_SIZE,
APP_SEND_RESULT
};
@@ -144,6 +173,8 @@
class TConnection {
private:
+ class Task;
+
// Server handle
TNonblockingServer* server_;
@@ -186,6 +217,12 @@
// Frame size
int32_t frameSize_;
+ // Task handle
+ int taskHandle_;
+
+ // Task event
+ struct event taskEvent_;
+
// Transport to read from
boost::shared_ptr<TMemoryBuffer> inputTransport_;
@@ -248,9 +285,19 @@
// Handler wrapper
static void eventHandler(int fd, short which, void* v) {
- assert(fd = ((TConnection*)v)->socket_);
+ assert(fd == ((TConnection*)v)->socket_);
((TConnection*)v)->workSocket();
}
+
+ // Handler wrapper for task block
+ static void taskHandler(int fd, short which, void* v) {
+ assert(fd == ((TConnection*)v)->taskHandle_);
+ if (-1 == ::close(((TConnection*)v)->taskHandle_)) {
+ GlobalOutput("TConnection::taskHandler close handle failed, resource leak");
+ }
+ ((TConnection*)v)->transition();
+ }
+
};
}}} // facebook::thrift::server