blob: d9643c33c6041f696f7d5e704eac198044ce1d0c [file] [log] [blame]
Marc Slemko3ea00332006-08-17 01:11:13 +00001#include <concurrency/ThreadManager.h>
2#include <concurrency/PosixThreadFactory.h>
3#include <concurrency/Monitor.h>
4#include <concurrency/Util.h>
5#include <protocol/TBinaryProtocol.h>
6#include <server/TSimpleServer.h>
7#include <server/TThreadPoolServer.h>
8#include <transport/TServerSocket.h>
9#include <transport/TSocket.h>
Mark Sleed7173472006-10-25 19:52:10 +000010#include <transport/TTransportUtils.h>
Aditya Agarwal3950f472006-10-11 02:50:15 +000011#include <transport/TBufferedRouterTransport.h>
Aditya Agarwal3950f472006-10-11 02:50:15 +000012#include <transport/TBufferedFileWriter.h>
13
14#include "Service.h"
Marc Slemko3ea00332006-08-17 01:11:13 +000015
16#include <iostream>
17#include <set>
18#include <stdexcept>
19#include <sstream>
20
21using namespace std;
22
23using namespace facebook::thrift;
24using namespace facebook::thrift::protocol;
25using namespace facebook::thrift::transport;
26using namespace facebook::thrift::server;
27
28using namespace test::stress;
29
Aditya Agarwal3950f472006-10-11 02:50:15 +000030class Server : public ServiceIf {
Marc Slemko3ea00332006-08-17 01:11:13 +000031 public:
Aditya Agarwal3950f472006-10-11 02:50:15 +000032 Server() {};
Marc Slemko3ea00332006-08-17 01:11:13 +000033 void echoVoid() {return;}
Aditya Agarwal3950f472006-10-11 02:50:15 +000034 int8_t echoByte(int8_t arg) {return arg;}
Marc Slemkod97eb612006-08-24 23:37:36 +000035 int32_t echoI32(int32_t arg) {return arg;}
36 int64_t echoI64(int64_t arg) {return arg;}
Marc Slemkob09f5882006-08-23 22:03:34 +000037 string echoString(string arg) {return arg;}
Aditya Agarwal3950f472006-10-11 02:50:15 +000038 vector<int8_t> echoList(vector<int8_t> arg) {return arg;}
39 set<int8_t> echoSet(set<int8_t> arg) {return arg;}
40 map<int8_t, int8_t> echoMap(map<int8_t, int8_t> arg) {return arg;}
Marc Slemko3ea00332006-08-17 01:11:13 +000041};
42
43class ClientThread: public Runnable {
44public:
45
Marc Slemkod97eb612006-08-24 23:37:36 +000046 ClientThread(shared_ptr<TTransport>transport, shared_ptr<ServiceClient> client, Monitor& monitor, size_t& workerCount, size_t loopCount, TType loopType) :
Marc Slemko3ea00332006-08-17 01:11:13 +000047 _transport(transport),
48 _client(client),
49 _monitor(monitor),
50 _workerCount(workerCount),
Marc Slemkod97eb612006-08-24 23:37:36 +000051 _loopCount(loopCount),
52 _loopType(loopType)
Marc Slemko3ea00332006-08-17 01:11:13 +000053 {}
54
55 void run() {
56
57 // Wait for all worker threads to start
58
59 {Synchronized s(_monitor);
60 while(_workerCount == 0) {
61 _monitor.wait();
62 }
63 }
64
65 _startTime = Util::currentTime();
66
67 _transport->open();
68
Marc Slemkod97eb612006-08-24 23:37:36 +000069 switch(_loopType) {
70 case T_VOID: loopEchoVoid(); break;
71 case T_BYTE: loopEchoByte(); break;
Marc Slemkod97eb612006-08-24 23:37:36 +000072 case T_I32: loopEchoI32(); break;
73 case T_I64: loopEchoI64(); break;
Marc Slemkod97eb612006-08-24 23:37:36 +000074 case T_STRING: loopEchoString(); break;
75 default: cerr << "Unexpected loop type" << _loopType << endl; break;
Marc Slemko3ea00332006-08-17 01:11:13 +000076 }
77
78 _endTime = Util::currentTime();
79
80 _transport->close();
81
82 _done = true;
83
84 {Synchronized s(_monitor);
85
86 _workerCount--;
Marc Slemko056f9ba2006-08-17 02:59:05 +000087
Marc Slemko3ea00332006-08-17 01:11:13 +000088 if(_workerCount == 0) {
89
90 _monitor.notify();
91 }
92 }
93 }
Marc Slemkod97eb612006-08-24 23:37:36 +000094
95 void loopEchoVoid() {
96 for(size_t ix = 0; ix < _loopCount; ix++) {
97 _client->echoVoid();
98 }
99 }
100
101 void loopEchoByte() {
102 for(size_t ix = 0; ix < _loopCount; ix++) {
Aditya Agarwal3950f472006-10-11 02:50:15 +0000103 int8_t arg = 1;
104 int8_t result;
Marc Slemkod97eb612006-08-24 23:37:36 +0000105 result =_client->echoByte(arg);
106 assert(result == arg);
107 }
108 }
109
Marc Slemkod97eb612006-08-24 23:37:36 +0000110 void loopEchoI32() {
111 for(size_t ix = 0; ix < _loopCount; ix++) {
Aditya Agarwal3950f472006-10-11 02:50:15 +0000112 int32_t arg = 1;
113 int32_t result;
Marc Slemkod97eb612006-08-24 23:37:36 +0000114 result =_client->echoI32(arg);
115 assert(result == arg);
116 }
117 }
118
119 void loopEchoI64() {
120 for(size_t ix = 0; ix < _loopCount; ix++) {
Aditya Agarwal3950f472006-10-11 02:50:15 +0000121 int64_t arg = 1;
122 int64_t result;
Marc Slemkod97eb612006-08-24 23:37:36 +0000123 result =_client->echoI64(arg);
124 assert(result == arg);
125 }
126 }
Aditya Agarwal3950f472006-10-11 02:50:15 +0000127
Marc Slemkod97eb612006-08-24 23:37:36 +0000128 void loopEchoString() {
129 for(size_t ix = 0; ix < _loopCount; ix++) {
130 string arg = "hello";
131 string result;
132 result =_client->echoString(arg);
133 assert(result == arg);
134 }
135 }
Marc Slemko3ea00332006-08-17 01:11:13 +0000136
Marc Slemko3ea00332006-08-17 01:11:13 +0000137 shared_ptr<TTransport> _transport;
138 shared_ptr<ServiceClient> _client;
139 Monitor& _monitor;
140 size_t& _workerCount;
141 size_t _loopCount;
Marc Slemkod97eb612006-08-24 23:37:36 +0000142 TType _loopType;
Marc Slemko3ea00332006-08-17 01:11:13 +0000143 long long _startTime;
144 long long _endTime;
145 bool _done;
146 Monitor _sleep;
147};
148
Marc Slemko3ea00332006-08-17 01:11:13 +0000149int main(int argc, char **argv) {
150
151 int port = 9090;
152 string serverType = "thread-pool";
153 string protocolType = "binary";
154 size_t workerCount = 4;
155 size_t clientCount = 10;
156 size_t loopCount = 10000;
Marc Slemkod97eb612006-08-24 23:37:36 +0000157 TType loopType = T_VOID;
158 string callName = "echoVoid";
Marc Slemkob09f5882006-08-23 22:03:34 +0000159 bool runServer = true;
Aditya Agarwal3950f472006-10-11 02:50:15 +0000160 bool logRequests = false;
161 string requestLogPath = "./requestlog.tlog";
Marc Slemko3ea00332006-08-17 01:11:13 +0000162
Marc Slemkob09f5882006-08-23 22:03:34 +0000163 ostringstream usage;
Marc Slemko3ea00332006-08-17 01:11:13 +0000164
165 usage <<
Marc Slemkob09f5882006-08-23 22:03:34 +0000166 argv[0] << " [--port=<port number>] [--server] [--server-type=<server-type>] [--protocol-type=<protocol-type>] [--workers=<worker-count>] [--clients=<client-count>] [--loop=<loop-count>]" << endl <<
167 "\tclients Number of client threads to create - 0 implies no clients, i.e. server only. Default is " << clientCount << endl <<
168 "\thelp Prints this help text." << endl <<
Marc Slemkod97eb612006-08-24 23:37:36 +0000169 "\tcall Service method to call. Default is " << callName << endl <<
Marc Slemkob09f5882006-08-23 22:03:34 +0000170 "\tloop The number of remote thrift calls each client makes. Default is " << loopCount << endl <<
171 "\tport The port the server and clients should bind to for thrift network connections. Default is " << port << endl <<
172 "\tserver Run the Thrift server in this process. Default is " << runServer << endl <<
173 "\tserver-type Type of server, \"simple\" or \"thread-pool\". Default is " << serverType << endl <<
174 "\tprotocol-type Type of protocol, \"binary\", \"ascii\", or \"xml\". Default is " << protocolType << endl <<
Aditya Agarwal3950f472006-10-11 02:50:15 +0000175 "\tlog-request Log all request to ./requestlog.tlog. Default is " << logRequests << endl <<
Marc Slemkob09f5882006-08-23 22:03:34 +0000176 "\tworkers Number of thread pools workers. Only valid for thread-pool server type. Default is " << workerCount << endl;
Marc Slemko3ea00332006-08-17 01:11:13 +0000177
178 map<string, string> args;
179
180 for(int ix = 1; ix < argc; ix++) {
181
182 string arg(argv[ix]);
183
184 if(arg.compare(0,2, "--") == 0) {
185
186 size_t end = arg.find_first_of("=", 2);
187
Marc Slemko056f9ba2006-08-17 02:59:05 +0000188 string key = string(arg, 2, end - 2);
189
Marc Slemko3ea00332006-08-17 01:11:13 +0000190 if(end != string::npos) {
Marc Slemko056f9ba2006-08-17 02:59:05 +0000191 args[key] = string(arg, end + 1);
Marc Slemko3ea00332006-08-17 01:11:13 +0000192 } else {
Marc Slemko056f9ba2006-08-17 02:59:05 +0000193 args[key] = "true";
Marc Slemko3ea00332006-08-17 01:11:13 +0000194 }
Marc Slemko3ea00332006-08-17 01:11:13 +0000195 } else {
196 throw invalid_argument("Unexcepted command line token: "+arg);
197 }
198 }
199
200 try {
201
Marc Slemkob09f5882006-08-23 22:03:34 +0000202 if(!args["clients"].empty()) {
203 clientCount = atoi(args["clients"].c_str());
204 }
205
206 if(!args["help"].empty()) {
207 cerr << usage.str();
208 return 0;
209 }
210
211 if(!args["loop"].empty()) {
212 loopCount = atoi(args["loop"].c_str());
213 }
214
Marc Slemkod97eb612006-08-24 23:37:36 +0000215 if(!args["call"].empty()) {
216 callName = args["call"];
217 }
218
Marc Slemko3ea00332006-08-17 01:11:13 +0000219 if(!args["port"].empty()) {
220 port = atoi(args["port"].c_str());
221 }
222
Marc Slemkob09f5882006-08-23 22:03:34 +0000223 if(!args["server"].empty()) {
224 runServer = args["server"] == "true";
225 }
226
Aditya Agarwal3950f472006-10-11 02:50:15 +0000227 if(!args["log-request"].empty()) {
228 logRequests = args["log-request"] == "true";
229 }
230
Marc Slemko3ea00332006-08-17 01:11:13 +0000231 if(!args["server-type"].empty()) {
232 serverType = args["server-type"];
233
234 if(serverType == "simple") {
235
236 } else if(serverType == "thread-pool") {
237
238 } else {
239
240 throw invalid_argument("Unknown server type "+serverType);
241 }
242 }
243
244 if(!args["workers"].empty()) {
245 workerCount = atoi(args["workers"].c_str());
246 }
247
Marc Slemko3ea00332006-08-17 01:11:13 +0000248 } catch(exception& e) {
249 cerr << e.what() << endl;
250 cerr << usage;
251 }
252
Marc Slemko3ea00332006-08-17 01:11:13 +0000253 shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
254
Marc Slemkob09f5882006-08-23 22:03:34 +0000255 if(runServer) {
Marc Slemko3ea00332006-08-17 01:11:13 +0000256
Marc Slemkob09f5882006-08-23 22:03:34 +0000257 // Dispatcher
Aditya Agarwal3950f472006-10-11 02:50:15 +0000258 shared_ptr<Server> serviceHandler(new Server());
Marc Slemko3ea00332006-08-17 01:11:13 +0000259
Mark Sleed7173472006-10-25 19:52:10 +0000260 shared_ptr<ServiceProcessor> serviceProcessor(new ServiceProcessor(serviceHandler));
Marc Slemko3ea00332006-08-17 01:11:13 +0000261
Marc Slemkob09f5882006-08-23 22:03:34 +0000262 // Transport
263 shared_ptr<TServerSocket> serverSocket(new TServerSocket(port));
Marc Slemko3ea00332006-08-17 01:11:13 +0000264
Aditya Agarwal3950f472006-10-11 02:50:15 +0000265 // Transport Factory
266 shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
267
Mark Sleed7173472006-10-25 19:52:10 +0000268 // Protocol Factory
269 shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
Aditya Agarwal3950f472006-10-11 02:50:15 +0000270
271 if (logRequests) {
272 // initialize the log file
273 shared_ptr<TBufferedFileWriter> bufferedFileWriter(new TBufferedFileWriter(requestLogPath, 1000));
274 bufferedFileWriter->setChunkSize(2 * 1024 * 1024);
275 bufferedFileWriter->setMaxEventSize(1024 * 16);
276
277 transportFactory = shared_ptr<TTransportFactory>(new TBufferedRouterTransportFactory(bufferedFileWriter));
278 }
Marc Slemko3ea00332006-08-17 01:11:13 +0000279
Marc Slemkob09f5882006-08-23 22:03:34 +0000280 shared_ptr<Thread> serverThread;
Marc Slemko3ea00332006-08-17 01:11:13 +0000281
Marc Slemkob09f5882006-08-23 22:03:34 +0000282 if(serverType == "simple") {
283
Mark Sleed7173472006-10-25 19:52:10 +0000284 serverThread = threadFactory->newThread(shared_ptr<TServer>(new TSimpleServer(serviceProcessor, serverSocket, transportFactory, protocolFactory)));
Marc Slemkob09f5882006-08-23 22:03:34 +0000285
286 } else if(serverType == "thread-pool") {
287
288 shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount);
289
290 threadManager->threadFactory(threadFactory);
Mark Sleed7173472006-10-25 19:52:10 +0000291 threadManager->start();
292 serverThread = threadFactory->newThread(shared_ptr<TServer>(new TThreadPoolServer(serviceProcessor, serverSocket, transportFactory, protocolFactory, threadManager)));
Marc Slemkob09f5882006-08-23 22:03:34 +0000293 }
294
295 cerr << "Starting the server on port " << port << endl;
296
297 serverThread->start();
298
299 // If we aren't running clients, just wait forever for external clients
300
Mark Sleed7173472006-10-25 19:52:10 +0000301 if (clientCount == 0) {
Marc Slemkob09f5882006-08-23 22:03:34 +0000302 serverThread->join();
303 }
Marc Slemko3ea00332006-08-17 01:11:13 +0000304 }
305
Mark Sleed7173472006-10-25 19:52:10 +0000306 if (clientCount > 0) {
Marc Slemko3ea00332006-08-17 01:11:13 +0000307
Marc Slemkob09f5882006-08-23 22:03:34 +0000308 Monitor monitor;
Marc Slemko3ea00332006-08-17 01:11:13 +0000309
Marc Slemkob09f5882006-08-23 22:03:34 +0000310 size_t threadCount = 0;
Marc Slemko3ea00332006-08-17 01:11:13 +0000311
Marc Slemkob09f5882006-08-23 22:03:34 +0000312 set<shared_ptr<Thread> > clientThreads;
Marc Slemko3ea00332006-08-17 01:11:13 +0000313
Marc Slemkod97eb612006-08-24 23:37:36 +0000314 if(callName == "echoVoid") { loopType = T_VOID;}
315 else if(callName == "echoByte") { loopType = T_BYTE;}
Marc Slemkod97eb612006-08-24 23:37:36 +0000316 else if(callName == "echoI32") { loopType = T_I32;}
317 else if(callName == "echoI64") { loopType = T_I64;}
Marc Slemkod97eb612006-08-24 23:37:36 +0000318 else if(callName == "echoString") { loopType = T_STRING;}
319 else {throw invalid_argument("Unknown service call "+callName);}
320
Marc Slemkob09f5882006-08-23 22:03:34 +0000321 for(size_t ix = 0; ix < clientCount; ix++) {
Marc Slemko3ea00332006-08-17 01:11:13 +0000322
Marc Slemkob09f5882006-08-23 22:03:34 +0000323 shared_ptr<TSocket> socket(new TSocket("127.0.01", port));
324 shared_ptr<TBufferedTransport> bufferedSocket(new TBufferedTransport(socket, 2048));
Mark Sleed7173472006-10-25 19:52:10 +0000325 shared_ptr<TProtocol> protocol(new TBinaryProtocol(bufferedSocket, bufferedSocket));
326 shared_ptr<ServiceClient> serviceClient(new ServiceClient(protocol));
Marc Slemko3ea00332006-08-17 01:11:13 +0000327
Aditya Agarwal3950f472006-10-11 02:50:15 +0000328 clientThreads.insert(threadFactory->newThread(shared_ptr<ClientThread>(new ClientThread(socket, serviceClient, monitor, threadCount, loopCount, loopType))));
Marc Slemkob09f5882006-08-23 22:03:34 +0000329 }
Marc Slemko3ea00332006-08-17 01:11:13 +0000330
Marc Slemkob09f5882006-08-23 22:03:34 +0000331 for(std::set<shared_ptr<Thread> >::const_iterator thread = clientThreads.begin(); thread != clientThreads.end(); thread++) {
332 (*thread)->start();
333 }
Marc Slemko3ea00332006-08-17 01:11:13 +0000334
Marc Slemkob09f5882006-08-23 22:03:34 +0000335 long long time00;
336 long long time01;
Marc Slemko3ea00332006-08-17 01:11:13 +0000337
Marc Slemkob09f5882006-08-23 22:03:34 +0000338 {Synchronized s(monitor);
339 threadCount = clientCount;
Marc Slemko056f9ba2006-08-17 02:59:05 +0000340
Marc Slemkob09f5882006-08-23 22:03:34 +0000341 cerr << "Launch "<< clientCount << " client threads" << endl;
Marc Slemko056f9ba2006-08-17 02:59:05 +0000342
Marc Slemkob09f5882006-08-23 22:03:34 +0000343 time00 = Util::currentTime();
Marc Slemko056f9ba2006-08-17 02:59:05 +0000344
Marc Slemkob09f5882006-08-23 22:03:34 +0000345 monitor.notifyAll();
346
347 while(threadCount > 0) {
348 monitor.wait();
349 }
Marc Slemko3ea00332006-08-17 01:11:13 +0000350
Marc Slemkob09f5882006-08-23 22:03:34 +0000351 time01 = Util::currentTime();
Marc Slemko3ea00332006-08-17 01:11:13 +0000352 }
Marc Slemko056f9ba2006-08-17 02:59:05 +0000353
Marc Slemkob09f5882006-08-23 22:03:34 +0000354 long long firstTime = 9223372036854775807LL;
355 long long lastTime = 0;
Marc Slemko3ea00332006-08-17 01:11:13 +0000356
Marc Slemkob09f5882006-08-23 22:03:34 +0000357 double averageTime = 0;
358 long long minTime = 9223372036854775807LL;
359 long long maxTime = 0;
Marc Slemko056f9ba2006-08-17 02:59:05 +0000360
Marc Slemkob09f5882006-08-23 22:03:34 +0000361 for(set<shared_ptr<Thread> >::iterator ix = clientThreads.begin(); ix != clientThreads.end(); ix++) {
Marc Slemko056f9ba2006-08-17 02:59:05 +0000362
Marc Slemkob09f5882006-08-23 22:03:34 +0000363 shared_ptr<ClientThread> client = dynamic_pointer_cast<ClientThread>((*ix)->runnable());
Marc Slemko056f9ba2006-08-17 02:59:05 +0000364
Marc Slemkob09f5882006-08-23 22:03:34 +0000365 long long delta = client->_endTime - client->_startTime;
Marc Slemko056f9ba2006-08-17 02:59:05 +0000366
Marc Slemkob09f5882006-08-23 22:03:34 +0000367 assert(delta > 0);
Marc Slemko056f9ba2006-08-17 02:59:05 +0000368
Marc Slemkob09f5882006-08-23 22:03:34 +0000369 if(client->_startTime < firstTime) {
370 firstTime = client->_startTime;
371 }
Marc Slemko056f9ba2006-08-17 02:59:05 +0000372
Marc Slemkob09f5882006-08-23 22:03:34 +0000373 if(client->_endTime > lastTime) {
374 lastTime = client->_endTime;
375 }
Marc Slemko056f9ba2006-08-17 02:59:05 +0000376
Marc Slemkob09f5882006-08-23 22:03:34 +0000377 if(delta < minTime) {
378 minTime = delta;
379 }
Marc Slemko056f9ba2006-08-17 02:59:05 +0000380
Marc Slemkob09f5882006-08-23 22:03:34 +0000381 if(delta > maxTime) {
382 maxTime = delta;
383 }
Marc Slemko056f9ba2006-08-17 02:59:05 +0000384
Marc Slemkob09f5882006-08-23 22:03:34 +0000385 averageTime+= delta;
386 }
387
388 averageTime /= clientCount;
389
390
391 cout << "workers :" << workerCount << ", client : " << clientCount << ", loops : " << loopCount << ", rate : " << (clientCount * loopCount * 1000) / ((double)(time01 - time00)) << endl;
392
393 cerr << "done." << endl;
Marc Slemko056f9ba2006-08-17 02:59:05 +0000394 }
Marc Slemko056f9ba2006-08-17 02:59:05 +0000395
Marc Slemko3ea00332006-08-17 01:11:13 +0000396 return 0;
397}