blob: 69a6eb05db3e22d2a470a94320351110dbd47cfd [file] [log] [blame]
#include <concurrency/ThreadManager.h>
#include <concurrency/PosixThreadFactory.h>
#include <concurrency/Monitor.h>
#include <concurrency/Util.h>
#include <protocol/TBinaryProtocol.h>
#include <server/TSimpleServer.h>
#include <server/TThreadPoolServer.h>
#include <transport/TServerSocket.h>
#include <transport/TSocket.h>
#include <transport/TBufferedTransport.h>
#include "StressTest.h"
#include <iostream>
#include <set>
#include <stdexcept>
#include <sstream>
using namespace std;
using namespace facebook::thrift;
using namespace facebook::thrift::protocol;
using namespace facebook::thrift::transport;
using namespace facebook::thrift::server;
using namespace test::stress;
class Server : public ServiceServerIf {
public:
Server(shared_ptr<TProtocol> protocol) :
ServiceServerIf(protocol) {}
void echoVoid() {return;}
uint8_t echoByte(uint8_t arg) {return arg;}
uint16_t echoU16(uint16_t arg) {return arg;}
uint32_t echoU32(uint32_t arg) {return arg;}
uint64_t echoU64(uint64_t arg) {return arg;}
string echoString(string arg) {return arg;}
list<uint8_t> echoList(list<uint8_t> arg) {return arg;}
set<uint8_t> echoSet(set<uint8_t> arg) {return arg;}
map<uint8_t, uint8_t> echoMap(map<uint8_t, uint8_t> arg) {return arg;}
};
class ClientThread: public Runnable {
public:
ClientThread(shared_ptr<TTransport>transport, shared_ptr<ServiceClient> client, Monitor& monitor, size_t& workerCount, size_t loopCount) :
_transport(transport),
_client(client),
_monitor(monitor),
_workerCount(workerCount),
_loopCount(loopCount)
{}
void run() {
// Wait for all worker threads to start
{Synchronized s(_monitor);
while(_workerCount == 0) {
_monitor.wait();
}
}
_startTime = Util::currentTime();
_transport->open();
//uint64_t arg = 0;
//uint64_t result = 0;
for(size_t ix = 0; ix < _loopCount; ix++) {
// result = _client->echoU64(arg);
// assert(result == arg);
_client->echoVoid();
//arg++;
}
_endTime = Util::currentTime();
_transport->close();
_done = true;
{Synchronized s(_monitor);
_workerCount--;
if(_workerCount == 0) {
_monitor.notify();
}
}
}
shared_ptr<TTransport> _transport;
shared_ptr<ServiceClient> _client;
Monitor& _monitor;
size_t& _workerCount;
size_t _loopCount;
long long _startTime;
long long _endTime;
bool _done;
Monitor _sleep;
};
int main(int argc, char **argv) {
int port = 9090;
string serverType = "thread-pool";
string protocolType = "binary";
size_t workerCount = 4;
size_t clientCount = 10;
size_t loopCount = 10000;
bool runServer = true;
ostringstream usage;
usage <<
argv[0] << " [--port=<port number>] [--server] [--server-type=<server-type>] [--protocol-type=<protocol-type>] [--workers=<worker-count>] [--clients=<client-count>] [--loop=<loop-count>]" << endl <<
"\tclients Number of client threads to create - 0 implies no clients, i.e. server only. Default is " << clientCount << endl <<
"\thelp Prints this help text." << endl <<
"\tloop The number of remote thrift calls each client makes. Default is " << loopCount << endl <<
"\tport The port the server and clients should bind to for thrift network connections. Default is " << port << endl <<
"\tserver Run the Thrift server in this process. Default is " << runServer << endl <<
"\tserver-type Type of server, \"simple\" or \"thread-pool\". Default is " << serverType << endl <<
"\tprotocol-type Type of protocol, \"binary\", \"ascii\", or \"xml\". Default is " << protocolType << endl <<
"\tworkers Number of thread pools workers. Only valid for thread-pool server type. Default is " << workerCount << endl;
map<string, string> args;
for(int ix = 1; ix < argc; ix++) {
string arg(argv[ix]);
if(arg.compare(0,2, "--") == 0) {
size_t end = arg.find_first_of("=", 2);
string key = string(arg, 2, end - 2);
if(end != string::npos) {
args[key] = string(arg, end + 1);
} else {
args[key] = "true";
}
} else {
throw invalid_argument("Unexcepted command line token: "+arg);
}
}
try {
if(!args["clients"].empty()) {
clientCount = atoi(args["clients"].c_str());
}
if(!args["help"].empty()) {
cerr << usage.str();
return 0;
}
if(!args["loop"].empty()) {
loopCount = atoi(args["loop"].c_str());
}
if(!args["port"].empty()) {
port = atoi(args["port"].c_str());
}
if(!args["server"].empty()) {
runServer = args["server"] == "true";
}
if(!args["server-type"].empty()) {
serverType = args["server-type"];
if(serverType == "simple") {
} else if(serverType == "thread-pool") {
} else {
throw invalid_argument("Unknown server type "+serverType);
}
}
if(!args["workers"].empty()) {
workerCount = atoi(args["workers"].c_str());
}
} catch(exception& e) {
cerr << e.what() << endl;
cerr << usage;
}
shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
if(runServer) {
// Dispatcher
shared_ptr<TBinaryProtocol> binaryProtocol(new TBinaryProtocol);
shared_ptr<Server> server(new Server(binaryProtocol));
// Options
shared_ptr<TServerOptions> serverOptions(new TServerOptions());
// Transport
shared_ptr<TServerSocket> serverSocket(new TServerSocket(port));
// ThreadFactory
shared_ptr<Thread> serverThread;
if(serverType == "simple") {
serverThread = threadFactory->newThread(shared_ptr<Runnable>(new TSimpleServer(server, serverOptions, serverSocket)));
} else if(serverType == "thread-pool") {
shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount);
threadManager->threadFactory(threadFactory);
threadManager->start();
serverThread = threadFactory->newThread(shared_ptr<TServer>(new TThreadPoolServer(server,
serverOptions,
serverSocket,
threadManager)));
}
cerr << "Starting the server on port " << port << endl;
serverThread->start();
// If we aren't running clients, just wait forever for external clients
if(clientCount == 0) {
serverThread->join();
}
}
if(clientCount > 0) {
Monitor monitor;
size_t threadCount = 0;
set<shared_ptr<Thread> > clientThreads;
for(size_t ix = 0; ix < clientCount; ix++) {
shared_ptr<TSocket> socket(new TSocket("127.0.01", port));
shared_ptr<TBufferedTransport> bufferedSocket(new TBufferedTransport(socket, 2048));
shared_ptr<TBinaryProtocol> binaryProtocol(new TBinaryProtocol());
shared_ptr<ServiceClient> serviceClient(new ServiceClient(bufferedSocket, binaryProtocol));
clientThreads.insert(threadFactory->newThread(shared_ptr<ClientThread>(new ClientThread(bufferedSocket, serviceClient, monitor, threadCount, loopCount))));
}
for(std::set<shared_ptr<Thread> >::const_iterator thread = clientThreads.begin(); thread != clientThreads.end(); thread++) {
(*thread)->start();
}
long long time00;
long long time01;
{Synchronized s(monitor);
threadCount = clientCount;
cerr << "Launch "<< clientCount << " client threads" << endl;
time00 = Util::currentTime();
monitor.notifyAll();
while(threadCount > 0) {
monitor.wait();
}
time01 = Util::currentTime();
}
long long firstTime = 9223372036854775807LL;
long long lastTime = 0;
double averageTime = 0;
long long minTime = 9223372036854775807LL;
long long maxTime = 0;
for(set<shared_ptr<Thread> >::iterator ix = clientThreads.begin(); ix != clientThreads.end(); ix++) {
shared_ptr<ClientThread> client = dynamic_pointer_cast<ClientThread>((*ix)->runnable());
long long delta = client->_endTime - client->_startTime;
assert(delta > 0);
if(client->_startTime < firstTime) {
firstTime = client->_startTime;
}
if(client->_endTime > lastTime) {
lastTime = client->_endTime;
}
if(delta < minTime) {
minTime = delta;
}
if(delta > maxTime) {
maxTime = delta;
}
averageTime+= delta;
}
averageTime /= clientCount;
cout << "workers :" << workerCount << ", client : " << clientCount << ", loops : " << loopCount << ", rate : " << (clientCount * loopCount * 1000) / ((double)(time01 - time00)) << endl;
cerr << "done." << endl;
}
return 0;
}