Add thread pool option to NonblockingServer
Summary: If you want requests processed outside of the I/O thread
Reviewed By: jake luciani, aditya
Test Plan: nb-main.cpp, in the test folder
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665132 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp
index 6337806..5c4dc8c 100644
--- a/lib/cpp/src/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/server/TNonblockingServer.cpp
@@ -6,6 +6,7 @@
#include "TNonblockingServer.h"
+#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
@@ -17,6 +18,50 @@
using namespace facebook::thrift::protocol;
using namespace facebook::thrift::transport;
+using namespace std;
+
+class TConnection::Task: public Runnable {
+ public:
+ Task(boost::shared_ptr<TProcessor> processor,
+ boost::shared_ptr<TProtocol> input,
+ boost::shared_ptr<TProtocol> output,
+ int taskHandle) :
+ processor_(processor),
+ input_(input),
+ output_(output),
+ taskHandle_(taskHandle) {}
+
+ void run() {
+ try {
+ while (processor_->process(input_, output_)) {
+ if (!input_->getTransport()->peek()) {
+ break;
+ }
+ }
+ } catch (TTransportException& ttx) {
+ cerr << "TThreadedServer client died: " << ttx.what() << endl;
+ } catch (TException& x) {
+ cerr << "TThreadedServer exception: " << x.what() << endl;
+ } catch (...) {
+ cerr << "TThreadedServer uncaught exception." << endl;
+ }
+
+ // Signal completion back to the libevent thread via a socketpair
+ int8_t b = 0;
+ if (-1 == send(taskHandle_, &b, sizeof(int8_t), 0)) {
+ GlobalOutput("TNonblockingServer::Task: send");
+ }
+ if (-1 == ::close(taskHandle_)) {
+ GlobalOutput("TNonblockingServer::Task: close, possible resource leak");
+ }
+ }
+
+ private:
+ boost::shared_ptr<TProcessor> processor_;
+ boost::shared_ptr<TProtocol> input_;
+ boost::shared_ptr<TProtocol> output_;
+ int taskHandle_;
+};
void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) {
socket_ = socket;
@@ -34,6 +79,8 @@
socketState_ = SOCKET_RECV;
appState_ = APP_INIT;
+ taskHandle_ = -1;
+
// Set flags, which also registers the event
setFlags(eventFlags);
@@ -168,24 +215,59 @@
// and get back some data from the dispatch function
inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
outputTransport_->resetBuffer();
+
+ if (server_->isThreadPoolProcessing()) {
+ // We are setting up a Task to do this work and we will wait on it
+ int sv[2];
+ if (-1 == socketpair(AF_LOCAL, SOCK_STREAM, 0, sv)) {
+ GlobalOutput("TConnection::socketpair() failed");
+ // Now we will fall through to the APP_WAIT_TASK block with no response
+ } else {
+ // Create task and dispatch to the thread manager
+ boost::shared_ptr<Runnable> task =
+ boost::shared_ptr<Runnable>(new Task(server_->getProcessor(),
+ inputProtocol_,
+ outputProtocol_,
+ sv[1]));
+ appState_ = APP_WAIT_TASK;
+ event_set(&taskEvent_,
+ taskHandle_ = sv[0],
+ EV_READ,
+ TConnection::taskHandler,
+ this);
- try {
- // Invoke the processor
- server_->getProcessor()->process(inputProtocol_, outputProtocol_);
- } catch (TTransportException &ttx) {
- fprintf(stderr, "TTransportException: Server::process() %s\n", ttx.what());
- close();
- return;
- } catch (TException &x) {
- fprintf(stderr, "TException: Server::process() %s\n", x.what());
- close();
- return;
- } catch (...) {
- fprintf(stderr, "Server::process() unknown exception\n");
- close();
- return;
+ // Add the event and start up the server
+ if (-1 == event_add(&taskEvent_, 0)) {
+ GlobalOutput("TNonblockingServer::serve(): coult not event_add");
+ return;
+ }
+ server_->addTask(task);
+ return;
+ }
+ } else {
+ try {
+ // Invoke the processor
+ server_->getProcessor()->process(inputProtocol_, outputProtocol_);
+ } catch (TTransportException &ttx) {
+ fprintf(stderr, "TTransportException: Server::process() %s\n", ttx.what());
+ close();
+ return;
+ } catch (TException &x) {
+ fprintf(stderr, "TException: Server::process() %s\n", x.what());
+ close();
+ return;
+ } catch (...) {
+ fprintf(stderr, "Server::process() unknown exception\n");
+ close();
+ return;
+ }
}
+ case APP_WAIT_TASK:
+ // We have now finished processing a task and the result has been written
+ // into the outputTransport_, so we grab its contents and place them into
+ // the writeBuffer_ for actual writing by the libevent thread
+
// Get the result of the operation
outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
@@ -212,7 +294,7 @@
setWrite();
// Try to work the socket immediately
- workSocket();
+ // workSocket();
return;
}
@@ -232,7 +314,7 @@
appState_ = APP_SEND_RESULT;
// Go to work on the socket right away, probably still writeable
- workSocket();
+ // workSocket();
return;
@@ -260,7 +342,7 @@
setRead();
// Try to work the socket right away
- workSocket();
+ // workSocket();
return;
@@ -283,7 +365,7 @@
appState_ = APP_READ_REQUEST;
// Work the socket right away
- workSocket();
+ // workSocket();
return;
@@ -315,13 +397,13 @@
*
* Prepares the event structure &event to be used in future calls to
* event_add() and event_del(). The event will be prepared to call the
- * event_handler using the 'sock' file descriptor to monitor events.
+ * eventHandler using the 'sock' file descriptor to monitor events.
*
* The events can be either EV_READ, EV_WRITE, or both, indicating
* that an application can read or write from the file respectively without
* blocking.
*
- * The event_handler will be called with the file descriptor that triggered
+ * The eventHandler will be called with the file descriptor that triggered
* the event and the type of event which will be one of: EV_TIMEOUT,
* EV_SIGNAL, EV_READ, EV_WRITE.
*
@@ -330,7 +412,7 @@
*
* Once initialized, the &event struct can be used repeatedly with
* event_add() and event_del() and does not need to be reinitialized unless
- * the event_handler and/or the argument to it are to be changed. However,
+ * the eventHandler and/or the argument to it are to be changed. However,
* when an ev structure has been added to libevent using event_add() the
* structure must persist until the event occurs (assuming EV_PERSIST
* is not set) or is removed using event_del(). You may not reuse the same
@@ -516,12 +598,12 @@
this);
// Add the event and start up the server
- if (event_add(&serverEvent, 0) == -1) {
+ if (-1 == event_add(&serverEvent, 0)) {
GlobalOutput("TNonblockingServer::serve(): coult not event_add");
return;
}
- // Run libevent engine, never returns, invokes calls to event_handler
+ // Run libevent engine, never returns, invokes calls to eventHandler
event_loop(0);
}