blob: 8344a885af6d94219d24f9cd7838fbb447f33be5 [file] [log] [blame]
Marc Slemko3ea00332006-08-17 01:11:13 +00001#include <concurrency/ThreadManager.h>
2#include <concurrency/PosixThreadFactory.h>
3#include <concurrency/Monitor.h>
4#include <concurrency/Util.h>
Mark Sleeb9ff32a2006-11-16 01:00:24 +00005#include <concurrency/Mutex.h>
Marc Slemko3ea00332006-08-17 01:11:13 +00006#include <protocol/TBinaryProtocol.h>
7#include <server/TSimpleServer.h>
8#include <server/TThreadPoolServer.h>
9#include <transport/TServerSocket.h>
10#include <transport/TSocket.h>
Mark Sleed7173472006-10-25 19:52:10 +000011#include <transport/TTransportUtils.h>
Aditya Agarwal3950f472006-10-11 02:50:15 +000012#include <transport/TBufferedRouterTransport.h>
Aditya Agarwal3950f472006-10-11 02:50:15 +000013#include <transport/TBufferedFileWriter.h>
14
15#include "Service.h"
Marc Slemko3ea00332006-08-17 01:11:13 +000016
17#include <iostream>
18#include <set>
19#include <stdexcept>
20#include <sstream>
21
Mark Sleeb9ff32a2006-11-16 01:00:24 +000022#include <map>
23#include <ext/hash_map>
24using __gnu_cxx::hash_map;
25using __gnu_cxx::hash;
26
Marc Slemko3ea00332006-08-17 01:11:13 +000027using namespace std;
28
29using namespace facebook::thrift;
30using namespace facebook::thrift::protocol;
31using namespace facebook::thrift::transport;
32using namespace facebook::thrift::server;
Mark Sleeb9ff32a2006-11-16 01:00:24 +000033using namespace facebook::thrift::concurrency;
Marc Slemko3ea00332006-08-17 01:11:13 +000034
35using namespace test::stress;
36
Mark Sleeb9ff32a2006-11-16 01:00:24 +000037struct eqstr {
38 bool operator()(const char* s1, const char* s2) const {
39 return strcmp(s1, s2) == 0;
40 }
41};
42
43struct ltstr {
44 bool operator()(const char* s1, const char* s2) const {
45 return strcmp(s1, s2) < 0;
46 }
47};
48
49
50// typedef hash_map<const char*, int, hash<const char*>, eqstr> count_map;
51typedef map<const char*, int, ltstr> count_map;
52
Aditya Agarwal3950f472006-10-11 02:50:15 +000053class Server : public ServiceIf {
Marc Slemko3ea00332006-08-17 01:11:13 +000054 public:
Mark Sleeb9ff32a2006-11-16 01:00:24 +000055 Server() {}
56
57 void count(const char* method) {
58 MutexMonitor m(lock_);
59 int ct = counts_[method];
60 counts_[method] = ++ct;
61 }
62
63 void echoVoid() {
64 count("echoVoid");
65 return;
66 }
67
68 count_map getCount() {
69 MutexMonitor m(lock_);
70 return counts_;
71 }
72
Aditya Agarwal3950f472006-10-11 02:50:15 +000073 int8_t echoByte(int8_t arg) {return arg;}
Marc Slemkod97eb612006-08-24 23:37:36 +000074 int32_t echoI32(int32_t arg) {return arg;}
75 int64_t echoI64(int64_t arg) {return arg;}
Marc Slemkob09f5882006-08-23 22:03:34 +000076 string echoString(string arg) {return arg;}
Aditya Agarwal3950f472006-10-11 02:50:15 +000077 vector<int8_t> echoList(vector<int8_t> arg) {return arg;}
78 set<int8_t> echoSet(set<int8_t> arg) {return arg;}
79 map<int8_t, int8_t> echoMap(map<int8_t, int8_t> arg) {return arg;}
Mark Sleeb9ff32a2006-11-16 01:00:24 +000080
81private:
82 count_map counts_;
83 Mutex lock_;
84
Marc Slemko3ea00332006-08-17 01:11:13 +000085};
86
87class ClientThread: public Runnable {
88public:
89
Marc Slemkod97eb612006-08-24 23:37:36 +000090 ClientThread(shared_ptr<TTransport>transport, shared_ptr<ServiceClient> client, Monitor& monitor, size_t& workerCount, size_t loopCount, TType loopType) :
Marc Slemko3ea00332006-08-17 01:11:13 +000091 _transport(transport),
92 _client(client),
93 _monitor(monitor),
94 _workerCount(workerCount),
Marc Slemkod97eb612006-08-24 23:37:36 +000095 _loopCount(loopCount),
96 _loopType(loopType)
Marc Slemko3ea00332006-08-17 01:11:13 +000097 {}
98
99 void run() {
100
101 // Wait for all worker threads to start
102
103 {Synchronized s(_monitor);
104 while(_workerCount == 0) {
105 _monitor.wait();
106 }
107 }
108
109 _startTime = Util::currentTime();
110
111 _transport->open();
112
Marc Slemkod97eb612006-08-24 23:37:36 +0000113 switch(_loopType) {
114 case T_VOID: loopEchoVoid(); break;
115 case T_BYTE: loopEchoByte(); break;
Marc Slemkod97eb612006-08-24 23:37:36 +0000116 case T_I32: loopEchoI32(); break;
117 case T_I64: loopEchoI64(); break;
Marc Slemkod97eb612006-08-24 23:37:36 +0000118 case T_STRING: loopEchoString(); break;
119 default: cerr << "Unexpected loop type" << _loopType << endl; break;
Marc Slemko3ea00332006-08-17 01:11:13 +0000120 }
121
122 _endTime = Util::currentTime();
123
124 _transport->close();
125
126 _done = true;
127
128 {Synchronized s(_monitor);
129
130 _workerCount--;
Marc Slemko056f9ba2006-08-17 02:59:05 +0000131
Marc Slemko3ea00332006-08-17 01:11:13 +0000132 if(_workerCount == 0) {
133
134 _monitor.notify();
135 }
136 }
137 }
Marc Slemkod97eb612006-08-24 23:37:36 +0000138
139 void loopEchoVoid() {
140 for(size_t ix = 0; ix < _loopCount; ix++) {
141 _client->echoVoid();
142 }
143 }
144
145 void loopEchoByte() {
146 for(size_t ix = 0; ix < _loopCount; ix++) {
Aditya Agarwal3950f472006-10-11 02:50:15 +0000147 int8_t arg = 1;
148 int8_t result;
Marc Slemkod97eb612006-08-24 23:37:36 +0000149 result =_client->echoByte(arg);
150 assert(result == arg);
151 }
152 }
153
Marc Slemkod97eb612006-08-24 23:37:36 +0000154 void loopEchoI32() {
155 for(size_t ix = 0; ix < _loopCount; ix++) {
Aditya Agarwal3950f472006-10-11 02:50:15 +0000156 int32_t arg = 1;
157 int32_t result;
Marc Slemkod97eb612006-08-24 23:37:36 +0000158 result =_client->echoI32(arg);
159 assert(result == arg);
160 }
161 }
162
163 void loopEchoI64() {
164 for(size_t ix = 0; ix < _loopCount; ix++) {
Aditya Agarwal3950f472006-10-11 02:50:15 +0000165 int64_t arg = 1;
166 int64_t result;
Marc Slemkod97eb612006-08-24 23:37:36 +0000167 result =_client->echoI64(arg);
168 assert(result == arg);
169 }
170 }
Aditya Agarwal3950f472006-10-11 02:50:15 +0000171
Marc Slemkod97eb612006-08-24 23:37:36 +0000172 void loopEchoString() {
173 for(size_t ix = 0; ix < _loopCount; ix++) {
174 string arg = "hello";
175 string result;
176 result =_client->echoString(arg);
177 assert(result == arg);
178 }
179 }
Marc Slemko3ea00332006-08-17 01:11:13 +0000180
Marc Slemko3ea00332006-08-17 01:11:13 +0000181 shared_ptr<TTransport> _transport;
182 shared_ptr<ServiceClient> _client;
183 Monitor& _monitor;
184 size_t& _workerCount;
185 size_t _loopCount;
Marc Slemkod97eb612006-08-24 23:37:36 +0000186 TType _loopType;
Marc Slemko3ea00332006-08-17 01:11:13 +0000187 long long _startTime;
188 long long _endTime;
189 bool _done;
190 Monitor _sleep;
191};
192
Marc Slemko3ea00332006-08-17 01:11:13 +0000193int main(int argc, char **argv) {
194
195 int port = 9090;
196 string serverType = "thread-pool";
197 string protocolType = "binary";
198 size_t workerCount = 4;
199 size_t clientCount = 10;
200 size_t loopCount = 10000;
Marc Slemkod97eb612006-08-24 23:37:36 +0000201 TType loopType = T_VOID;
202 string callName = "echoVoid";
Marc Slemkob09f5882006-08-23 22:03:34 +0000203 bool runServer = true;
Aditya Agarwal3950f472006-10-11 02:50:15 +0000204 bool logRequests = false;
205 string requestLogPath = "./requestlog.tlog";
Marc Slemko3ea00332006-08-17 01:11:13 +0000206
Marc Slemkob09f5882006-08-23 22:03:34 +0000207 ostringstream usage;
Marc Slemko3ea00332006-08-17 01:11:13 +0000208
209 usage <<
Marc Slemkob09f5882006-08-23 22:03:34 +0000210 argv[0] << " [--port=<port number>] [--server] [--server-type=<server-type>] [--protocol-type=<protocol-type>] [--workers=<worker-count>] [--clients=<client-count>] [--loop=<loop-count>]" << endl <<
211 "\tclients Number of client threads to create - 0 implies no clients, i.e. server only. Default is " << clientCount << endl <<
212 "\thelp Prints this help text." << endl <<
Marc Slemkod97eb612006-08-24 23:37:36 +0000213 "\tcall Service method to call. Default is " << callName << endl <<
Marc Slemkob09f5882006-08-23 22:03:34 +0000214 "\tloop The number of remote thrift calls each client makes. Default is " << loopCount << endl <<
215 "\tport The port the server and clients should bind to for thrift network connections. Default is " << port << endl <<
216 "\tserver Run the Thrift server in this process. Default is " << runServer << endl <<
217 "\tserver-type Type of server, \"simple\" or \"thread-pool\". Default is " << serverType << endl <<
218 "\tprotocol-type Type of protocol, \"binary\", \"ascii\", or \"xml\". Default is " << protocolType << endl <<
Aditya Agarwal3950f472006-10-11 02:50:15 +0000219 "\tlog-request Log all request to ./requestlog.tlog. Default is " << logRequests << endl <<
Marc Slemkob09f5882006-08-23 22:03:34 +0000220 "\tworkers Number of thread pools workers. Only valid for thread-pool server type. Default is " << workerCount << endl;
Marc Slemko3ea00332006-08-17 01:11:13 +0000221
222 map<string, string> args;
223
224 for(int ix = 1; ix < argc; ix++) {
225
226 string arg(argv[ix]);
227
228 if(arg.compare(0,2, "--") == 0) {
229
230 size_t end = arg.find_first_of("=", 2);
231
Marc Slemko056f9ba2006-08-17 02:59:05 +0000232 string key = string(arg, 2, end - 2);
233
Marc Slemko3ea00332006-08-17 01:11:13 +0000234 if(end != string::npos) {
Marc Slemko056f9ba2006-08-17 02:59:05 +0000235 args[key] = string(arg, end + 1);
Marc Slemko3ea00332006-08-17 01:11:13 +0000236 } else {
Marc Slemko056f9ba2006-08-17 02:59:05 +0000237 args[key] = "true";
Marc Slemko3ea00332006-08-17 01:11:13 +0000238 }
Marc Slemko3ea00332006-08-17 01:11:13 +0000239 } else {
240 throw invalid_argument("Unexcepted command line token: "+arg);
241 }
242 }
243
244 try {
245
Marc Slemkob09f5882006-08-23 22:03:34 +0000246 if(!args["clients"].empty()) {
247 clientCount = atoi(args["clients"].c_str());
248 }
249
250 if(!args["help"].empty()) {
251 cerr << usage.str();
252 return 0;
253 }
254
255 if(!args["loop"].empty()) {
256 loopCount = atoi(args["loop"].c_str());
257 }
258
Marc Slemkod97eb612006-08-24 23:37:36 +0000259 if(!args["call"].empty()) {
260 callName = args["call"];
261 }
262
Marc Slemko3ea00332006-08-17 01:11:13 +0000263 if(!args["port"].empty()) {
264 port = atoi(args["port"].c_str());
265 }
266
Marc Slemkob09f5882006-08-23 22:03:34 +0000267 if(!args["server"].empty()) {
268 runServer = args["server"] == "true";
269 }
270
Aditya Agarwal3950f472006-10-11 02:50:15 +0000271 if(!args["log-request"].empty()) {
272 logRequests = args["log-request"] == "true";
273 }
274
Marc Slemko3ea00332006-08-17 01:11:13 +0000275 if(!args["server-type"].empty()) {
276 serverType = args["server-type"];
277
278 if(serverType == "simple") {
279
280 } else if(serverType == "thread-pool") {
281
282 } else {
283
284 throw invalid_argument("Unknown server type "+serverType);
285 }
286 }
287
288 if(!args["workers"].empty()) {
289 workerCount = atoi(args["workers"].c_str());
290 }
291
Marc Slemko3ea00332006-08-17 01:11:13 +0000292 } catch(exception& e) {
293 cerr << e.what() << endl;
294 cerr << usage;
295 }
296
Marc Slemko3ea00332006-08-17 01:11:13 +0000297 shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
298
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000299 // Dispatcher
300 shared_ptr<Server> serviceHandler(new Server());
Marc Slemko3ea00332006-08-17 01:11:13 +0000301
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000302 if(runServer) {
Marc Slemko3ea00332006-08-17 01:11:13 +0000303
Mark Sleed7173472006-10-25 19:52:10 +0000304 shared_ptr<ServiceProcessor> serviceProcessor(new ServiceProcessor(serviceHandler));
Marc Slemko3ea00332006-08-17 01:11:13 +0000305
Marc Slemkob09f5882006-08-23 22:03:34 +0000306 // Transport
307 shared_ptr<TServerSocket> serverSocket(new TServerSocket(port));
Marc Slemko3ea00332006-08-17 01:11:13 +0000308
Aditya Agarwal3950f472006-10-11 02:50:15 +0000309 // Transport Factory
310 shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
311
Mark Sleed7173472006-10-25 19:52:10 +0000312 // Protocol Factory
313 shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
Aditya Agarwal3950f472006-10-11 02:50:15 +0000314
315 if (logRequests) {
316 // initialize the log file
317 shared_ptr<TBufferedFileWriter> bufferedFileWriter(new TBufferedFileWriter(requestLogPath, 1000));
318 bufferedFileWriter->setChunkSize(2 * 1024 * 1024);
319 bufferedFileWriter->setMaxEventSize(1024 * 16);
320
321 transportFactory = shared_ptr<TTransportFactory>(new TBufferedRouterTransportFactory(bufferedFileWriter));
322 }
Marc Slemko3ea00332006-08-17 01:11:13 +0000323
Marc Slemkob09f5882006-08-23 22:03:34 +0000324 shared_ptr<Thread> serverThread;
Marc Slemko3ea00332006-08-17 01:11:13 +0000325
Marc Slemkob09f5882006-08-23 22:03:34 +0000326 if(serverType == "simple") {
327
Mark Sleed7173472006-10-25 19:52:10 +0000328 serverThread = threadFactory->newThread(shared_ptr<TServer>(new TSimpleServer(serviceProcessor, serverSocket, transportFactory, protocolFactory)));
Marc Slemkob09f5882006-08-23 22:03:34 +0000329
330 } else if(serverType == "thread-pool") {
331
332 shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount);
333
334 threadManager->threadFactory(threadFactory);
Mark Sleed7173472006-10-25 19:52:10 +0000335 threadManager->start();
336 serverThread = threadFactory->newThread(shared_ptr<TServer>(new TThreadPoolServer(serviceProcessor, serverSocket, transportFactory, protocolFactory, threadManager)));
Marc Slemkob09f5882006-08-23 22:03:34 +0000337 }
338
339 cerr << "Starting the server on port " << port << endl;
340
341 serverThread->start();
342
343 // If we aren't running clients, just wait forever for external clients
344
Mark Sleed7173472006-10-25 19:52:10 +0000345 if (clientCount == 0) {
Marc Slemkob09f5882006-08-23 22:03:34 +0000346 serverThread->join();
347 }
Marc Slemko3ea00332006-08-17 01:11:13 +0000348 }
349
Mark Sleed7173472006-10-25 19:52:10 +0000350 if (clientCount > 0) {
Marc Slemko3ea00332006-08-17 01:11:13 +0000351
Marc Slemkob09f5882006-08-23 22:03:34 +0000352 Monitor monitor;
Marc Slemko3ea00332006-08-17 01:11:13 +0000353
Marc Slemkob09f5882006-08-23 22:03:34 +0000354 size_t threadCount = 0;
Marc Slemko3ea00332006-08-17 01:11:13 +0000355
Marc Slemkob09f5882006-08-23 22:03:34 +0000356 set<shared_ptr<Thread> > clientThreads;
Marc Slemko3ea00332006-08-17 01:11:13 +0000357
Marc Slemkod97eb612006-08-24 23:37:36 +0000358 if(callName == "echoVoid") { loopType = T_VOID;}
359 else if(callName == "echoByte") { loopType = T_BYTE;}
Marc Slemkod97eb612006-08-24 23:37:36 +0000360 else if(callName == "echoI32") { loopType = T_I32;}
361 else if(callName == "echoI64") { loopType = T_I64;}
Marc Slemkod97eb612006-08-24 23:37:36 +0000362 else if(callName == "echoString") { loopType = T_STRING;}
363 else {throw invalid_argument("Unknown service call "+callName);}
364
Marc Slemkob09f5882006-08-23 22:03:34 +0000365 for(size_t ix = 0; ix < clientCount; ix++) {
Marc Slemko3ea00332006-08-17 01:11:13 +0000366
Marc Slemkob09f5882006-08-23 22:03:34 +0000367 shared_ptr<TSocket> socket(new TSocket("127.0.01", port));
368 shared_ptr<TBufferedTransport> bufferedSocket(new TBufferedTransport(socket, 2048));
Mark Sleed7173472006-10-25 19:52:10 +0000369 shared_ptr<TProtocol> protocol(new TBinaryProtocol(bufferedSocket, bufferedSocket));
370 shared_ptr<ServiceClient> serviceClient(new ServiceClient(protocol));
Marc Slemko3ea00332006-08-17 01:11:13 +0000371
Aditya Agarwal3950f472006-10-11 02:50:15 +0000372 clientThreads.insert(threadFactory->newThread(shared_ptr<ClientThread>(new ClientThread(socket, serviceClient, monitor, threadCount, loopCount, loopType))));
Marc Slemkob09f5882006-08-23 22:03:34 +0000373 }
Marc Slemko3ea00332006-08-17 01:11:13 +0000374
Marc Slemkob09f5882006-08-23 22:03:34 +0000375 for(std::set<shared_ptr<Thread> >::const_iterator thread = clientThreads.begin(); thread != clientThreads.end(); thread++) {
376 (*thread)->start();
377 }
Marc Slemko3ea00332006-08-17 01:11:13 +0000378
Marc Slemkob09f5882006-08-23 22:03:34 +0000379 long long time00;
380 long long time01;
Marc Slemko3ea00332006-08-17 01:11:13 +0000381
Marc Slemkob09f5882006-08-23 22:03:34 +0000382 {Synchronized s(monitor);
383 threadCount = clientCount;
Marc Slemko056f9ba2006-08-17 02:59:05 +0000384
Marc Slemkob09f5882006-08-23 22:03:34 +0000385 cerr << "Launch "<< clientCount << " client threads" << endl;
Marc Slemko056f9ba2006-08-17 02:59:05 +0000386
Marc Slemkob09f5882006-08-23 22:03:34 +0000387 time00 = Util::currentTime();
Marc Slemko056f9ba2006-08-17 02:59:05 +0000388
Marc Slemkob09f5882006-08-23 22:03:34 +0000389 monitor.notifyAll();
390
391 while(threadCount > 0) {
392 monitor.wait();
393 }
Marc Slemko3ea00332006-08-17 01:11:13 +0000394
Marc Slemkob09f5882006-08-23 22:03:34 +0000395 time01 = Util::currentTime();
Marc Slemko3ea00332006-08-17 01:11:13 +0000396 }
Marc Slemko056f9ba2006-08-17 02:59:05 +0000397
Marc Slemkob09f5882006-08-23 22:03:34 +0000398 long long firstTime = 9223372036854775807LL;
399 long long lastTime = 0;
Marc Slemko3ea00332006-08-17 01:11:13 +0000400
Marc Slemkob09f5882006-08-23 22:03:34 +0000401 double averageTime = 0;
402 long long minTime = 9223372036854775807LL;
403 long long maxTime = 0;
Marc Slemko056f9ba2006-08-17 02:59:05 +0000404
Marc Slemkob09f5882006-08-23 22:03:34 +0000405 for(set<shared_ptr<Thread> >::iterator ix = clientThreads.begin(); ix != clientThreads.end(); ix++) {
Marc Slemko056f9ba2006-08-17 02:59:05 +0000406
Marc Slemkob09f5882006-08-23 22:03:34 +0000407 shared_ptr<ClientThread> client = dynamic_pointer_cast<ClientThread>((*ix)->runnable());
Marc Slemko056f9ba2006-08-17 02:59:05 +0000408
Marc Slemkob09f5882006-08-23 22:03:34 +0000409 long long delta = client->_endTime - client->_startTime;
Marc Slemko056f9ba2006-08-17 02:59:05 +0000410
Marc Slemkob09f5882006-08-23 22:03:34 +0000411 assert(delta > 0);
Marc Slemko056f9ba2006-08-17 02:59:05 +0000412
Marc Slemkob09f5882006-08-23 22:03:34 +0000413 if(client->_startTime < firstTime) {
414 firstTime = client->_startTime;
415 }
Marc Slemko056f9ba2006-08-17 02:59:05 +0000416
Marc Slemkob09f5882006-08-23 22:03:34 +0000417 if(client->_endTime > lastTime) {
418 lastTime = client->_endTime;
419 }
Marc Slemko056f9ba2006-08-17 02:59:05 +0000420
Marc Slemkob09f5882006-08-23 22:03:34 +0000421 if(delta < minTime) {
422 minTime = delta;
423 }
Marc Slemko056f9ba2006-08-17 02:59:05 +0000424
Marc Slemkob09f5882006-08-23 22:03:34 +0000425 if(delta > maxTime) {
426 maxTime = delta;
427 }
Marc Slemko056f9ba2006-08-17 02:59:05 +0000428
Marc Slemkob09f5882006-08-23 22:03:34 +0000429 averageTime+= delta;
430 }
431
432 averageTime /= clientCount;
433
434
435 cout << "workers :" << workerCount << ", client : " << clientCount << ", loops : " << loopCount << ", rate : " << (clientCount * loopCount * 1000) / ((double)(time01 - time00)) << endl;
436
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000437 count_map count = serviceHandler->getCount();
438 count_map::iterator iter;
439 for (iter = count.begin(); iter != count.end(); ++iter) {
440 printf("%s => %d\n", iter->first, iter->second);
441 }
Marc Slemkob09f5882006-08-23 22:03:34 +0000442 cerr << "done." << endl;
Marc Slemko056f9ba2006-08-17 02:59:05 +0000443 }
Marc Slemko056f9ba2006-08-17 02:59:05 +0000444
Marc Slemko3ea00332006-08-17 01:11:13 +0000445 return 0;
446}