Add thread pool option to NonblockingServer
Summary: If you want requests processed outside of the I/O thread
Reviewed By: jake luciani, aditya
Test Plan: nb-main.cpp, in the test folder
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665132 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp
index 6337806..5c4dc8c 100644
--- a/lib/cpp/src/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/server/TNonblockingServer.cpp
@@ -6,6 +6,7 @@
#include "TNonblockingServer.h"
+#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
@@ -17,6 +18,50 @@
using namespace facebook::thrift::protocol;
using namespace facebook::thrift::transport;
+using namespace std;
+
+class TConnection::Task: public Runnable {
+ public:
+ Task(boost::shared_ptr<TProcessor> processor,
+ boost::shared_ptr<TProtocol> input,
+ boost::shared_ptr<TProtocol> output,
+ int taskHandle) :
+ processor_(processor),
+ input_(input),
+ output_(output),
+ taskHandle_(taskHandle) {}
+
+ void run() {
+ try {
+ while (processor_->process(input_, output_)) {
+ if (!input_->getTransport()->peek()) {
+ break;
+ }
+ }
+ } catch (TTransportException& ttx) {
+ cerr << "TThreadedServer client died: " << ttx.what() << endl;
+ } catch (TException& x) {
+ cerr << "TThreadedServer exception: " << x.what() << endl;
+ } catch (...) {
+ cerr << "TThreadedServer uncaught exception." << endl;
+ }
+
+ // Signal completion back to the libevent thread via a socketpair
+ int8_t b = 0;
+ if (-1 == send(taskHandle_, &b, sizeof(int8_t), 0)) {
+ GlobalOutput("TNonblockingServer::Task: send");
+ }
+ if (-1 == ::close(taskHandle_)) {
+ GlobalOutput("TNonblockingServer::Task: close, possible resource leak");
+ }
+ }
+
+ private:
+ boost::shared_ptr<TProcessor> processor_;
+ boost::shared_ptr<TProtocol> input_;
+ boost::shared_ptr<TProtocol> output_;
+ int taskHandle_;
+};
void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) {
socket_ = socket;
@@ -34,6 +79,8 @@
socketState_ = SOCKET_RECV;
appState_ = APP_INIT;
+ taskHandle_ = -1;
+
// Set flags, which also registers the event
setFlags(eventFlags);
@@ -168,24 +215,59 @@
// and get back some data from the dispatch function
inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
outputTransport_->resetBuffer();
+
+ if (server_->isThreadPoolProcessing()) {
+ // We are setting up a Task to do this work and we will wait on it
+ int sv[2];
+ if (-1 == socketpair(AF_LOCAL, SOCK_STREAM, 0, sv)) {
+ GlobalOutput("TConnection::socketpair() failed");
+ // Now we will fall through to the APP_WAIT_TASK block with no response
+ } else {
+ // Create task and dispatch to the thread manager
+ boost::shared_ptr<Runnable> task =
+ boost::shared_ptr<Runnable>(new Task(server_->getProcessor(),
+ inputProtocol_,
+ outputProtocol_,
+ sv[1]));
+ appState_ = APP_WAIT_TASK;
+ event_set(&taskEvent_,
+ taskHandle_ = sv[0],
+ EV_READ,
+ TConnection::taskHandler,
+ this);
- try {
- // Invoke the processor
- server_->getProcessor()->process(inputProtocol_, outputProtocol_);
- } catch (TTransportException &ttx) {
- fprintf(stderr, "TTransportException: Server::process() %s\n", ttx.what());
- close();
- return;
- } catch (TException &x) {
- fprintf(stderr, "TException: Server::process() %s\n", x.what());
- close();
- return;
- } catch (...) {
- fprintf(stderr, "Server::process() unknown exception\n");
- close();
- return;
+ // Add the event and start up the server
+ if (-1 == event_add(&taskEvent_, 0)) {
+ GlobalOutput("TNonblockingServer::serve(): coult not event_add");
+ return;
+ }
+ server_->addTask(task);
+ return;
+ }
+ } else {
+ try {
+ // Invoke the processor
+ server_->getProcessor()->process(inputProtocol_, outputProtocol_);
+ } catch (TTransportException &ttx) {
+ fprintf(stderr, "TTransportException: Server::process() %s\n", ttx.what());
+ close();
+ return;
+ } catch (TException &x) {
+ fprintf(stderr, "TException: Server::process() %s\n", x.what());
+ close();
+ return;
+ } catch (...) {
+ fprintf(stderr, "Server::process() unknown exception\n");
+ close();
+ return;
+ }
}
+ case APP_WAIT_TASK:
+ // We have now finished processing a task and the result has been written
+ // into the outputTransport_, so we grab its contents and place them into
+ // the writeBuffer_ for actual writing by the libevent thread
+
// Get the result of the operation
outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
@@ -212,7 +294,7 @@
setWrite();
// Try to work the socket immediately
- workSocket();
+ // workSocket();
return;
}
@@ -232,7 +314,7 @@
appState_ = APP_SEND_RESULT;
// Go to work on the socket right away, probably still writeable
- workSocket();
+ // workSocket();
return;
@@ -260,7 +342,7 @@
setRead();
// Try to work the socket right away
- workSocket();
+ // workSocket();
return;
@@ -283,7 +365,7 @@
appState_ = APP_READ_REQUEST;
// Work the socket right away
- workSocket();
+ // workSocket();
return;
@@ -315,13 +397,13 @@
*
* Prepares the event structure &event to be used in future calls to
* event_add() and event_del(). The event will be prepared to call the
- * event_handler using the 'sock' file descriptor to monitor events.
+ * eventHandler using the 'sock' file descriptor to monitor events.
*
* The events can be either EV_READ, EV_WRITE, or both, indicating
* that an application can read or write from the file respectively without
* blocking.
*
- * The event_handler will be called with the file descriptor that triggered
+ * The eventHandler will be called with the file descriptor that triggered
* the event and the type of event which will be one of: EV_TIMEOUT,
* EV_SIGNAL, EV_READ, EV_WRITE.
*
@@ -330,7 +412,7 @@
*
* Once initialized, the &event struct can be used repeatedly with
* event_add() and event_del() and does not need to be reinitialized unless
- * the event_handler and/or the argument to it are to be changed. However,
+ * the eventHandler and/or the argument to it are to be changed. However,
* when an ev structure has been added to libevent using event_add() the
* structure must persist until the event occurs (assuming EV_PERSIST
* is not set) or is removed using event_del(). You may not reuse the same
@@ -516,12 +598,12 @@
this);
// Add the event and start up the server
- if (event_add(&serverEvent, 0) == -1) {
+ if (-1 == event_add(&serverEvent, 0)) {
GlobalOutput("TNonblockingServer::serve(): coult not event_add");
return;
}
- // Run libevent engine, never returns, invokes calls to event_handler
+ // Run libevent engine, never returns, invokes calls to eventHandler
event_loop(0);
}
diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h
index e4f8346..0ce7ccb 100644
--- a/lib/cpp/src/server/TNonblockingServer.h
+++ b/lib/cpp/src/server/TNonblockingServer.h
@@ -10,6 +10,7 @@
#include <Thrift.h>
#include <server/TServer.h>
#include <transport/TTransportUtils.h>
+#include <concurrency/ThreadManager.h>
#include <stack>
#include <event.h>
@@ -17,6 +18,8 @@
using facebook::thrift::transport::TMemoryBuffer;
using facebook::thrift::protocol::TProtocol;
+using facebook::thrift::concurrency::Runnable;
+using facebook::thrift::concurrency::ThreadManager;
// Forward declaration of class
class TConnection;
@@ -46,6 +49,12 @@
// Whether to frame responses
bool frameResponses_;
+ // For processing via thread pool, may be NULL
+ boost::shared_ptr<ThreadManager> threadManager_;
+
+ // Is thread pool processing?
+ bool threadPoolProcessing_;
+
/**
* This is a stack of all the objects that have been created but that
* are NOT currently in use. When we close a connection, we place it on this
@@ -66,15 +75,18 @@
TNonblockingServer(boost::shared_ptr<TProcessor> processor,
boost::shared_ptr<TProtocolFactory> protocolFactory,
- int port) :
+ int port,
+ boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
TServer(processor),
serverSocket_(0),
port_(port),
- frameResponses_(true) {
+ frameResponses_(true),
+ threadManager_(threadManager) {
setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
setInputProtocolFactory(protocolFactory);
setOutputProtocolFactory(protocolFactory);
+ setThreadManager(threadManager);
}
TNonblockingServer(boost::shared_ptr<TProcessor> processor,
@@ -82,19 +94,35 @@
boost::shared_ptr<TTransportFactory> outputTransportFactory,
boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
- int port) :
+ int port,
+ boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
TServer(processor),
serverSocket_(0),
port_(port),
- frameResponses_(true) {
+ frameResponses_(true),
+ threadManager_(threadManager) {
setInputTransportFactory(inputTransportFactory);
setOutputTransportFactory(outputTransportFactory);
setInputProtocolFactory(inputProtocolFactory);
setOutputProtocolFactory(outputProtocolFactory);
+ setThreadManager(threadManager);
}
~TNonblockingServer() {}
+ void setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
+ threadManager_ = threadManager;
+ threadPoolProcessing_ = (threadManager != NULL);
+ }
+
+ bool isThreadPoolProcessing() {
+ return threadPoolProcessing_;
+ }
+
+ void addTask(boost::shared_ptr<Runnable> task) {
+ threadManager_->add(task);
+ }
+
void setFrameResponses(bool frameResponses) {
frameResponses_ = frameResponses;
}
@@ -133,6 +161,7 @@
APP_INIT,
APP_READ_FRAME_SIZE,
APP_READ_REQUEST,
+ APP_WAIT_TASK,
APP_SEND_FRAME_SIZE,
APP_SEND_RESULT
};
@@ -144,6 +173,8 @@
class TConnection {
private:
+ class Task;
+
// Server handle
TNonblockingServer* server_;
@@ -186,6 +217,12 @@
// Frame size
int32_t frameSize_;
+ // Task handle
+ int taskHandle_;
+
+ // Task event
+ struct event taskEvent_;
+
// Transport to read from
boost::shared_ptr<TMemoryBuffer> inputTransport_;
@@ -248,9 +285,19 @@
// Handler wrapper
static void eventHandler(int fd, short which, void* v) {
- assert(fd = ((TConnection*)v)->socket_);
+ assert(fd == ((TConnection*)v)->socket_);
((TConnection*)v)->workSocket();
}
+
+ // Handler wrapper for task block
+ static void taskHandler(int fd, short which, void* v) {
+ assert(fd == ((TConnection*)v)->taskHandle_);
+ if (-1 == ::close(((TConnection*)v)->taskHandle_)) {
+ GlobalOutput("TConnection::taskHandler close handle failed, resource leak");
+ }
+ ((TConnection*)v)->transition();
+ }
+
};
}}} // facebook::thrift::server
diff --git a/lib/cpp/src/transport/TServerSocket.cpp b/lib/cpp/src/transport/TServerSocket.cpp
index 0d25bf0..2f6ce5d 100644
--- a/lib/cpp/src/transport/TServerSocket.cpp
+++ b/lib/cpp/src/transport/TServerSocket.cpp
@@ -68,8 +68,8 @@
intSock1_ = -1;
intSock2_ = -1;
} else {
- intSock1_ = sv[0];
- intSock2_ = sv[1];
+ intSock1_ = sv[1];
+ intSock2_ = sv[0];
}
serverSocket_ = socket(AF_INET, SOCK_STREAM, 0);
diff --git a/test/cpp/Makefile.stress b/test/cpp/Makefile.stress
index a0fe4cf..9d79541 100644
--- a/test/cpp/Makefile.stress
+++ b/test/cpp/Makefile.stress
@@ -1,5 +1,5 @@
# Makefile for Thrift test project.
-#
+#
# Author:
# Marc Kwiatkowski <marc@facebook.com>
# Aditya Agarwal <aditya@facebook.com>
@@ -18,8 +18,8 @@
target: all
include_paths = $(thrift_home)/lib/cpp/src \
- $(thrift_home)/lib/cpp \
- $(boost_home)
+ $(thrift_home)/lib/cpp \
+ $(boost_home)
include_flags = $(patsubst %,-I%, $(include_paths))
@@ -32,16 +32,22 @@
LD = g++
# Compiler flags
-DCFL = -Wall -O3 -g -I./gen-cpp $(include_flags) -L$(thrift_home)/lib/cpp/.libs -lthrift -levent
-CFL = -Wall -O3 -I./gen-cpp $(include_flags) -L$(thrift_home)/lib/cpp/.libs -lthrift -levent
+DCFL = -Wall -O3 -g -I./gen-cpp $(include_flags) -L$(thrift_home)/lib/cpp/.libs -lthrift -lthriftnb -levent
+CFL = -Wall -O3 -I./gen-cpp $(include_flags) -L$(thrift_home)/lib/cpp/.libs -lthrift -lthriftnb -levent
-all: stress-test
+all: stress-test stress-test-nb
-debug: stress-test-debug
+debug: stress-test-debug stress-test-debug-nb
stubs: ../StressTest.thrift
$(THRIFT) --cpp --php ../StressTest.thrift
+stress-test-debug-nb: stubs
+ g++ -o stress-test-nb $(DCFL) src/nb-main.cpp ./gen-cpp/Service.cpp gen-cpp/StressTest_types.cpp
+
+stress-test-nb: stubs
+ g++ -o stress-test-nb $(CFL) src/nb-main.cpp ./gen-cpp/Service.cpp gen-cpp/StressTest_types.cpp
+
stress-test-debug: stubs
g++ -o stress-test $(DCFL) src/main.cpp ./gen-cpp/Service.cpp gen-cpp/StressTest_types.cpp
@@ -49,4 +55,4 @@
g++ -o stress-test $(CFL) src/main.cpp ./gen-cpp/Service.cpp gen-cpp/StressTest_types.cpp
clean:
- rm -fr stress-test gen-cpp
+ rm -fr stress-test stress-test-nb gen-cpp
diff --git a/test/cpp/src/nb-main.cpp b/test/cpp/src/nb-main.cpp
new file mode 100644
index 0000000..7e69524
--- /dev/null
+++ b/test/cpp/src/nb-main.cpp
@@ -0,0 +1,484 @@
+#include <concurrency/ThreadManager.h>
+#include <concurrency/PosixThreadFactory.h>
+#include <concurrency/Monitor.h>
+#include <concurrency/Util.h>
+#include <concurrency/Mutex.h>
+#include <protocol/TBinaryProtocol.h>
+#include <server/TSimpleServer.h>
+#include <server/TThreadPoolServer.h>
+#include <server/TThreadedServer.h>
+#include <server/TNonblockingServer.h>
+#include <transport/TServerSocket.h>
+#include <transport/TSocket.h>
+#include <transport/TTransportUtils.h>
+#include <transport/TFileTransport.h>
+#include <TLogging.h>
+
+#include "Service.h"
+
+#include <boost/shared_ptr.hpp>
+
+#include <iostream>
+#include <set>
+#include <stdexcept>
+#include <sstream>
+
+#include <map>
+#include <ext/hash_map>
+using __gnu_cxx::hash_map;
+using __gnu_cxx::hash;
+
+using namespace std;
+using namespace boost;
+
+using namespace facebook::thrift;
+using namespace facebook::thrift::protocol;
+using namespace facebook::thrift::transport;
+using namespace facebook::thrift::server;
+using namespace facebook::thrift::concurrency;
+
+using namespace test::stress;
+
+struct eqstr {
+ bool operator()(const char* s1, const char* s2) const {
+ return strcmp(s1, s2) == 0;
+ }
+};
+
+struct ltstr {
+ bool operator()(const char* s1, const char* s2) const {
+ return strcmp(s1, s2) < 0;
+ }
+};
+
+
+// typedef hash_map<const char*, int, hash<const char*>, eqstr> count_map;
+typedef map<const char*, int, ltstr> count_map;
+
+class Server : public ServiceIf {
+ public:
+ Server() {}
+
+ void count(const char* method) {
+ MutexMonitor m(lock_);
+ int ct = counts_[method];
+ counts_[method] = ++ct;
+ }
+
+ void echoVoid() {
+ count("echoVoid");
+
+ //Sleep to simulate work
+ struct timeval time_struct;
+ time_struct.tv_sec = 0;
+ time_struct.tv_usec = 5000;
+
+ select( (int) NULL, (fd_set *)NULL, (fd_set *)NULL,(fd_set *)NULL, &time_struct );
+
+
+
+ return;
+ }
+
+ count_map getCount() {
+ MutexMonitor m(lock_);
+ return counts_;
+ }
+
+ int8_t echoByte(const int8_t arg) {return arg;}
+ int32_t echoI32(const int32_t arg) {return arg;}
+ int64_t echoI64(const int64_t arg) {return arg;}
+ void echoString(string& out, const string &arg) {
+ if (arg != "hello") {
+ T_ERROR_ABORT("WRONG STRING!!!!");
+ }
+ out = arg;
+ }
+ void echoList(vector<int8_t> &out, const vector<int8_t> &arg) { out = arg; }
+ void echoSet(set<int8_t> &out, const set<int8_t> &arg) { out = arg; }
+ void echoMap(map<int8_t, int8_t> &out, const map<int8_t, int8_t> &arg) { out = arg; }
+
+private:
+ count_map counts_;
+ Mutex lock_;
+
+};
+
+class ClientThread: public Runnable {
+public:
+
+ ClientThread(shared_ptr<TTransport>transport, shared_ptr<ServiceClient> client, Monitor& monitor, size_t& workerCount, size_t loopCount, TType loopType) :
+ _transport(transport),
+ _client(client),
+ _monitor(monitor),
+ _workerCount(workerCount),
+ _loopCount(loopCount),
+ _loopType(loopType)
+ {}
+
+ void run() {
+
+ // Wait for all worker threads to start
+
+ {Synchronized s(_monitor);
+ while(_workerCount == 0) {
+ _monitor.wait();
+ }
+ }
+
+ _startTime = Util::currentTime();
+
+ _transport->open();
+
+ switch(_loopType) {
+ case T_VOID: loopEchoVoid(); break;
+ case T_BYTE: loopEchoByte(); break;
+ case T_I32: loopEchoI32(); break;
+ case T_I64: loopEchoI64(); break;
+ case T_STRING: loopEchoString(); break;
+ default: cerr << "Unexpected loop type" << _loopType << endl; break;
+ }
+
+ _endTime = Util::currentTime();
+
+ _transport->close();
+
+ _done = true;
+
+ {Synchronized s(_monitor);
+
+ _workerCount--;
+
+ if(_workerCount == 0) {
+
+ _monitor.notify();
+ }
+ }
+ }
+
+ void loopEchoVoid() {
+ for(size_t ix = 0; ix < _loopCount; ix++) {
+ _client->echoVoid();
+ }
+ }
+
+ void loopEchoByte() {
+ for(size_t ix = 0; ix < _loopCount; ix++) {
+ int8_t arg = 1;
+ int8_t result;
+ result =_client->echoByte(arg);
+ assert(result == arg);
+ }
+ }
+
+ void loopEchoI32() {
+ for(size_t ix = 0; ix < _loopCount; ix++) {
+ int32_t arg = 1;
+ int32_t result;
+ result =_client->echoI32(arg);
+ assert(result == arg);
+ }
+ }
+
+ void loopEchoI64() {
+ for(size_t ix = 0; ix < _loopCount; ix++) {
+ int64_t arg = 1;
+ int64_t result;
+ result =_client->echoI64(arg);
+ assert(result == arg);
+ }
+ }
+
+ void loopEchoString() {
+ for(size_t ix = 0; ix < _loopCount; ix++) {
+ string arg = "hello";
+ string result;
+ _client->echoString(result, arg);
+ assert(result == arg);
+ }
+ }
+
+ shared_ptr<TTransport> _transport;
+ shared_ptr<ServiceClient> _client;
+ Monitor& _monitor;
+ size_t& _workerCount;
+ size_t _loopCount;
+ TType _loopType;
+ long long _startTime;
+ long long _endTime;
+ bool _done;
+ Monitor _sleep;
+};
+
+
+int main(int argc, char **argv) {
+
+ int port = 9091;
+ string serverType = "simple";
+ string protocolType = "binary";
+ size_t workerCount = 4;
+ size_t clientCount = 20;
+ size_t loopCount = 50000;
+ TType loopType = T_VOID;
+ string callName = "echoVoid";
+ bool runServer = true;
+ bool logRequests = false;
+ string requestLogPath = "./requestlog.tlog";
+ bool replayRequests = false;
+
+ ostringstream usage;
+
+ usage <<
+ argv[0] << " [--port=<port number>] [--server] [--server-type=<server-type>] [--protocol-type=<protocol-type>] [--workers=<worker-count>] [--clients=<client-count>] [--loop=<loop-count>]" << endl <<
+ "\tclients Number of client threads to create - 0 implies no clients, i.e. server only. Default is " << clientCount << endl <<
+ "\thelp Prints this help text." << endl <<
+ "\tcall Service method to call. Default is " << callName << endl <<
+ "\tloop The number of remote thrift calls each client makes. Default is " << loopCount << endl <<
+ "\tport The port the server and clients should bind to for thrift network connections. Default is " << port << endl <<
+ "\tserver Run the Thrift server in this process. Default is " << runServer << endl <<
+ "\tserver-type Type of server, \"simple\" or \"thread-pool\". Default is " << serverType << endl <<
+ "\tprotocol-type Type of protocol, \"binary\", \"ascii\", or \"xml\". Default is " << protocolType << endl <<
+ "\tlog-request Log all request to ./requestlog.tlog. Default is " << logRequests << endl <<
+ "\treplay-request Replay requests from log file (./requestlog.tlog) Default is " << replayRequests << endl <<
+ "\tworkers Number of thread pools workers. Only valid for thread-pool server type. Default is " << workerCount << endl;
+
+
+ map<string, string> args;
+
+ for(int ix = 1; ix < argc; ix++) {
+
+ string arg(argv[ix]);
+
+ if(arg.compare(0,2, "--") == 0) {
+
+ size_t end = arg.find_first_of("=", 2);
+
+ string key = string(arg, 2, end - 2);
+
+ if(end != string::npos) {
+ args[key] = string(arg, end + 1);
+ } else {
+ args[key] = "true";
+ }
+ } else {
+ throw invalid_argument("Unexcepted command line token: "+arg);
+ }
+ }
+
+ try {
+
+ if(!args["clients"].empty()) {
+ clientCount = atoi(args["clients"].c_str());
+ }
+
+ if(!args["help"].empty()) {
+ cerr << usage.str();
+ return 0;
+ }
+
+ if(!args["loop"].empty()) {
+ loopCount = atoi(args["loop"].c_str());
+ }
+
+ if(!args["call"].empty()) {
+ callName = args["call"];
+ }
+
+ if(!args["port"].empty()) {
+ port = atoi(args["port"].c_str());
+ }
+
+ if(!args["server"].empty()) {
+ runServer = args["server"] == "true";
+ }
+
+ if(!args["log-request"].empty()) {
+ logRequests = args["log-request"] == "true";
+ }
+
+ if(!args["replay-request"].empty()) {
+ replayRequests = args["replay-request"] == "true";
+ }
+
+ if(!args["server-type"].empty()) {
+ serverType = args["server-type"];
+ }
+
+ if(!args["workers"].empty()) {
+ workerCount = atoi(args["workers"].c_str());
+ }
+
+ } catch(exception& e) {
+ cerr << e.what() << endl;
+ cerr << usage;
+ }
+
+ shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
+
+ // Dispatcher
+ shared_ptr<Server> serviceHandler(new Server());
+
+ if (replayRequests) {
+ shared_ptr<Server> serviceHandler(new Server());
+ shared_ptr<ServiceProcessor> serviceProcessor(new ServiceProcessor(serviceHandler));
+
+ // Transports
+ shared_ptr<TFileTransport> fileTransport(new TFileTransport(requestLogPath));
+ fileTransport->setChunkSize(2 * 1024 * 1024);
+ fileTransport->setMaxEventSize(1024 * 16);
+ fileTransport->seekToEnd();
+
+ // Protocol Factory
+ shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
+
+ TFileProcessor fileProcessor(serviceProcessor,
+ protocolFactory,
+ fileTransport);
+
+ fileProcessor.process(0, true);
+ exit(0);
+ }
+
+
+ if(runServer) {
+
+ shared_ptr<ServiceProcessor> serviceProcessor(new ServiceProcessor(serviceHandler));
+
+ // Protocol Factory
+ shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
+
+ // Transport Factory
+ shared_ptr<TTransportFactory> transportFactory;
+
+ if (logRequests) {
+ // initialize the log file
+ shared_ptr<TFileTransport> fileTransport(new TFileTransport(requestLogPath));
+ fileTransport->setChunkSize(2 * 1024 * 1024);
+ fileTransport->setMaxEventSize(1024 * 16);
+
+ transportFactory =
+ shared_ptr<TTransportFactory>(new TPipedTransportFactory(fileTransport));
+ }
+
+ shared_ptr<Thread> serverThread;
+
+ if(serverType == "simple") {
+
+ serverThread = threadFactory->newThread(shared_ptr<TServer>(new TNonblockingServer(serviceProcessor, protocolFactory,port)));
+
+ } else if(serverType == "thread-pool") {
+
+ shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount);
+
+ threadManager->threadFactory(threadFactory);
+ threadManager->start();
+ serverThread = threadFactory->newThread(shared_ptr<TServer>(new TNonblockingServer(serviceProcessor, protocolFactory, port, threadManager)));
+ }
+
+ cerr << "Starting the server on port " << port << endl;
+ serverThread->start();
+
+ // If we aren't running clients, just wait forever for external clients
+
+ if (clientCount == 0) {
+ serverThread->join();
+ }
+ }
+
+ if (clientCount > 0) {
+
+ Monitor monitor;
+
+ size_t threadCount = 0;
+
+ set<shared_ptr<Thread> > clientThreads;
+
+ if(callName == "echoVoid") { loopType = T_VOID;}
+ else if(callName == "echoByte") { loopType = T_BYTE;}
+ else if(callName == "echoI32") { loopType = T_I32;}
+ else if(callName == "echoI64") { loopType = T_I64;}
+ else if(callName == "echoString") { loopType = T_STRING;}
+ else {throw invalid_argument("Unknown service call "+callName);}
+
+ for(size_t ix = 0; ix < clientCount; ix++) {
+
+ shared_ptr<TSocket> socket(new TSocket("127.0.0.1", port));
+ shared_ptr<TFramedTransport> framedSocket(new TFramedTransport(socket));
+ shared_ptr<TProtocol> protocol(new TBinaryProtocol(framedSocket));
+ shared_ptr<ServiceClient> serviceClient(new ServiceClient(protocol));
+
+ clientThreads.insert(threadFactory->newThread(shared_ptr<ClientThread>(new ClientThread(socket, serviceClient, monitor, threadCount, loopCount, loopType))));
+ }
+
+ for(std::set<shared_ptr<Thread> >::const_iterator thread = clientThreads.begin(); thread != clientThreads.end(); thread++) {
+ (*thread)->start();
+ }
+
+ long long time00;
+ long long time01;
+
+ {Synchronized s(monitor);
+ threadCount = clientCount;
+
+ cerr << "Launch "<< clientCount << " client threads" << endl;
+
+ time00 = Util::currentTime();
+
+ monitor.notifyAll();
+
+ while(threadCount > 0) {
+ monitor.wait();
+ }
+
+ time01 = Util::currentTime();
+ }
+
+ long long firstTime = 9223372036854775807LL;
+ long long lastTime = 0;
+
+ double averageTime = 0;
+ long long minTime = 9223372036854775807LL;
+ long long maxTime = 0;
+
+ for(set<shared_ptr<Thread> >::iterator ix = clientThreads.begin(); ix != clientThreads.end(); ix++) {
+
+ shared_ptr<ClientThread> client = dynamic_pointer_cast<ClientThread>((*ix)->runnable());
+
+ long long delta = client->_endTime - client->_startTime;
+
+ assert(delta > 0);
+
+ if(client->_startTime < firstTime) {
+ firstTime = client->_startTime;
+ }
+
+ if(client->_endTime > lastTime) {
+ lastTime = client->_endTime;
+ }
+
+ if(delta < minTime) {
+ minTime = delta;
+ }
+
+ if(delta > maxTime) {
+ maxTime = delta;
+ }
+
+ averageTime+= delta;
+ }
+
+ averageTime /= clientCount;
+
+
+ cout << "workers :" << workerCount << ", client : " << clientCount << ", loops : " << loopCount << ", rate : " << (clientCount * loopCount * 1000) / ((double)(time01 - time00)) << endl;
+
+ count_map count = serviceHandler->getCount();
+ count_map::iterator iter;
+ for (iter = count.begin(); iter != count.end(); ++iter) {
+ printf("%s => %d\n", iter->first, iter->second);
+ }
+ cerr << "done." << endl;
+ }
+
+ return 0;
+}