blob: 062f9037cc11d2a12e8a1ac1d1c7ebfb41d6777c [file] [log] [blame]
Marc Slemko3ea00332006-08-17 01:11:13 +00001#include <concurrency/ThreadManager.h>
2#include <concurrency/PosixThreadFactory.h>
3#include <concurrency/Monitor.h>
4#include <concurrency/Util.h>
5#include <protocol/TBinaryProtocol.h>
6#include <server/TSimpleServer.h>
7#include <server/TThreadPoolServer.h>
8#include <transport/TServerSocket.h>
9#include <transport/TSocket.h>
10#include <transport/TBufferedTransport.h>
11#include "StressTest.h"
12
13#include <iostream>
14#include <set>
15#include <stdexcept>
16#include <sstream>
17
18using namespace std;
19
20using namespace facebook::thrift;
21using namespace facebook::thrift::protocol;
22using namespace facebook::thrift::transport;
23using namespace facebook::thrift::server;
24
25using namespace test::stress;
26
27class Server : public ServiceServerIf {
28 public:
29 Server(shared_ptr<TProtocol> protocol) :
30 ServiceServerIf(protocol) {}
31
32 void echoVoid() {return;}
33 uint8_t echoByte(uint8_t arg) {return arg;}
Marc Slemkod97eb612006-08-24 23:37:36 +000034 int16_t echoI16(int16_t arg) {return arg;}
35 int32_t echoI32(int32_t arg) {return arg;}
36 int64_t echoI64(int64_t arg) {return arg;}
Marc Slemko3ea00332006-08-17 01:11:13 +000037 uint16_t echoU16(uint16_t arg) {return arg;}
38 uint32_t echoU32(uint32_t arg) {return arg;}
39 uint64_t echoU64(uint64_t arg) {return arg;}
Marc Slemkob09f5882006-08-23 22:03:34 +000040 string echoString(string arg) {return arg;}
41 list<uint8_t> echoList(list<uint8_t> arg) {return arg;}
42 set<uint8_t> echoSet(set<uint8_t> arg) {return arg;}
43 map<uint8_t, uint8_t> echoMap(map<uint8_t, uint8_t> arg) {return arg;}
Marc Slemko3ea00332006-08-17 01:11:13 +000044};
45
46class ClientThread: public Runnable {
47public:
48
Marc Slemkod97eb612006-08-24 23:37:36 +000049 ClientThread(shared_ptr<TTransport>transport, shared_ptr<ServiceClient> client, Monitor& monitor, size_t& workerCount, size_t loopCount, TType loopType) :
Marc Slemko3ea00332006-08-17 01:11:13 +000050 _transport(transport),
51 _client(client),
52 _monitor(monitor),
53 _workerCount(workerCount),
Marc Slemkod97eb612006-08-24 23:37:36 +000054 _loopCount(loopCount),
55 _loopType(loopType)
Marc Slemko3ea00332006-08-17 01:11:13 +000056 {}
57
58 void run() {
59
60 // Wait for all worker threads to start
61
62 {Synchronized s(_monitor);
63 while(_workerCount == 0) {
64 _monitor.wait();
65 }
66 }
67
68 _startTime = Util::currentTime();
69
70 _transport->open();
71
Marc Slemkod97eb612006-08-24 23:37:36 +000072 switch(_loopType) {
73 case T_VOID: loopEchoVoid(); break;
74 case T_BYTE: loopEchoByte(); break;
75 case T_I16: loopEchoI16(); break;
76 case T_I32: loopEchoI32(); break;
77 case T_I64: loopEchoI64(); break;
78 case T_U16: loopEchoU16(); break;
79 case T_U32: loopEchoU32(); break;
80 case T_U64: loopEchoU64(); break;
81 case T_STRING: loopEchoString(); break;
82 default: cerr << "Unexpected loop type" << _loopType << endl; break;
Marc Slemko3ea00332006-08-17 01:11:13 +000083 }
84
85 _endTime = Util::currentTime();
86
87 _transport->close();
88
89 _done = true;
90
91 {Synchronized s(_monitor);
92
93 _workerCount--;
Marc Slemko056f9ba2006-08-17 02:59:05 +000094
Marc Slemko3ea00332006-08-17 01:11:13 +000095 if(_workerCount == 0) {
96
97 _monitor.notify();
98 }
99 }
100 }
Marc Slemkod97eb612006-08-24 23:37:36 +0000101
102 void loopEchoVoid() {
103 for(size_t ix = 0; ix < _loopCount; ix++) {
104 _client->echoVoid();
105 }
106 }
107
108 void loopEchoByte() {
109 for(size_t ix = 0; ix < _loopCount; ix++) {
110 uint8_t arg = 1;
111 uint8_t result;
112 result =_client->echoByte(arg);
113 assert(result == arg);
114 }
115 }
116
117 void loopEchoI16() {
118 for(size_t ix = 0; ix < _loopCount; ix++) {
119 uint16_t arg = 1;
120 uint16_t result;
121 result =_client->echoI16(arg);
122 assert(result == arg);
123 }
124 }
125
126 void loopEchoI32() {
127 for(size_t ix = 0; ix < _loopCount; ix++) {
128 uint32_t arg = 1;
129 uint32_t result;
130 result =_client->echoI32(arg);
131 assert(result == arg);
132 }
133 }
134
135 void loopEchoI64() {
136 for(size_t ix = 0; ix < _loopCount; ix++) {
137 uint64_t arg = 1;
138 uint64_t result;
139 result =_client->echoI64(arg);
140 assert(result == arg);
141 }
142 }
143
144 void loopEchoU16() {
145 for(size_t ix = 0; ix < _loopCount; ix++) {
146 uint16_t arg = 1;
147 uint16_t result;
148 result =_client->echoU16(arg);
149 assert(result == arg);
150 }
151 }
152
153 void loopEchoU32() {
154 for(size_t ix = 0; ix < _loopCount; ix++) {
155 uint32_t arg = 1;
156 uint32_t result;
157 result =_client->echoU32(arg);
158 assert(result == arg);
159 }
160 }
161
162 void loopEchoU64() {
163 for(size_t ix = 0; ix < _loopCount; ix++) {
164 uint64_t arg = 1;
165 uint64_t result;
166 result =_client->echoU64(arg);
167 assert(result == arg);
168 }
169 }
170
171 void loopEchoString() {
172 for(size_t ix = 0; ix < _loopCount; ix++) {
173 string arg = "hello";
174 string result;
175 result =_client->echoString(arg);
176 assert(result == arg);
177 }
178 }
Marc Slemko3ea00332006-08-17 01:11:13 +0000179
Marc Slemko3ea00332006-08-17 01:11:13 +0000180 shared_ptr<TTransport> _transport;
181 shared_ptr<ServiceClient> _client;
182 Monitor& _monitor;
183 size_t& _workerCount;
184 size_t _loopCount;
Marc Slemkod97eb612006-08-24 23:37:36 +0000185 TType _loopType;
Marc Slemko3ea00332006-08-17 01:11:13 +0000186 long long _startTime;
187 long long _endTime;
188 bool _done;
189 Monitor _sleep;
190};
191
Marc Slemko3ea00332006-08-17 01:11:13 +0000192int main(int argc, char **argv) {
193
194 int port = 9090;
195 string serverType = "thread-pool";
196 string protocolType = "binary";
197 size_t workerCount = 4;
198 size_t clientCount = 10;
199 size_t loopCount = 10000;
Marc Slemkod97eb612006-08-24 23:37:36 +0000200 TType loopType = T_VOID;
201 string callName = "echoVoid";
Marc Slemkob09f5882006-08-23 22:03:34 +0000202 bool runServer = true;
Marc Slemko3ea00332006-08-17 01:11:13 +0000203
Marc Slemkob09f5882006-08-23 22:03:34 +0000204 ostringstream usage;
Marc Slemko3ea00332006-08-17 01:11:13 +0000205
206 usage <<
Marc Slemkob09f5882006-08-23 22:03:34 +0000207 argv[0] << " [--port=<port number>] [--server] [--server-type=<server-type>] [--protocol-type=<protocol-type>] [--workers=<worker-count>] [--clients=<client-count>] [--loop=<loop-count>]" << endl <<
208 "\tclients Number of client threads to create - 0 implies no clients, i.e. server only. Default is " << clientCount << endl <<
209 "\thelp Prints this help text." << endl <<
Marc Slemkod97eb612006-08-24 23:37:36 +0000210 "\tcall Service method to call. Default is " << callName << endl <<
Marc Slemkob09f5882006-08-23 22:03:34 +0000211 "\tloop The number of remote thrift calls each client makes. Default is " << loopCount << endl <<
212 "\tport The port the server and clients should bind to for thrift network connections. Default is " << port << endl <<
213 "\tserver Run the Thrift server in this process. Default is " << runServer << endl <<
214 "\tserver-type Type of server, \"simple\" or \"thread-pool\". Default is " << serverType << endl <<
215 "\tprotocol-type Type of protocol, \"binary\", \"ascii\", or \"xml\". Default is " << protocolType << endl <<
216 "\tworkers Number of thread pools workers. Only valid for thread-pool server type. Default is " << workerCount << endl;
Marc Slemko3ea00332006-08-17 01:11:13 +0000217
218 map<string, string> args;
219
220 for(int ix = 1; ix < argc; ix++) {
221
222 string arg(argv[ix]);
223
224 if(arg.compare(0,2, "--") == 0) {
225
226 size_t end = arg.find_first_of("=", 2);
227
Marc Slemko056f9ba2006-08-17 02:59:05 +0000228 string key = string(arg, 2, end - 2);
229
Marc Slemko3ea00332006-08-17 01:11:13 +0000230 if(end != string::npos) {
Marc Slemko056f9ba2006-08-17 02:59:05 +0000231 args[key] = string(arg, end + 1);
Marc Slemko3ea00332006-08-17 01:11:13 +0000232 } else {
Marc Slemko056f9ba2006-08-17 02:59:05 +0000233 args[key] = "true";
Marc Slemko3ea00332006-08-17 01:11:13 +0000234 }
Marc Slemko3ea00332006-08-17 01:11:13 +0000235 } else {
236 throw invalid_argument("Unexcepted command line token: "+arg);
237 }
238 }
239
240 try {
241
Marc Slemkob09f5882006-08-23 22:03:34 +0000242 if(!args["clients"].empty()) {
243 clientCount = atoi(args["clients"].c_str());
244 }
245
246 if(!args["help"].empty()) {
247 cerr << usage.str();
248 return 0;
249 }
250
251 if(!args["loop"].empty()) {
252 loopCount = atoi(args["loop"].c_str());
253 }
254
Marc Slemkod97eb612006-08-24 23:37:36 +0000255 if(!args["call"].empty()) {
256 callName = args["call"];
257 }
258
Marc Slemko3ea00332006-08-17 01:11:13 +0000259 if(!args["port"].empty()) {
260 port = atoi(args["port"].c_str());
261 }
262
Marc Slemkob09f5882006-08-23 22:03:34 +0000263 if(!args["server"].empty()) {
264 runServer = args["server"] == "true";
265 }
266
Marc Slemko3ea00332006-08-17 01:11:13 +0000267 if(!args["server-type"].empty()) {
268 serverType = args["server-type"];
269
270 if(serverType == "simple") {
271
272 } else if(serverType == "thread-pool") {
273
274 } else {
275
276 throw invalid_argument("Unknown server type "+serverType);
277 }
278 }
279
280 if(!args["workers"].empty()) {
281 workerCount = atoi(args["workers"].c_str());
282 }
283
Marc Slemko3ea00332006-08-17 01:11:13 +0000284 } catch(exception& e) {
285 cerr << e.what() << endl;
286 cerr << usage;
287 }
288
Marc Slemko3ea00332006-08-17 01:11:13 +0000289 shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
290
Marc Slemkob09f5882006-08-23 22:03:34 +0000291 if(runServer) {
Marc Slemko3ea00332006-08-17 01:11:13 +0000292
Marc Slemkob09f5882006-08-23 22:03:34 +0000293 // Dispatcher
294 shared_ptr<TBinaryProtocol> binaryProtocol(new TBinaryProtocol);
Marc Slemko3ea00332006-08-17 01:11:13 +0000295
Marc Slemkob09f5882006-08-23 22:03:34 +0000296 shared_ptr<Server> server(new Server(binaryProtocol));
Marc Slemko3ea00332006-08-17 01:11:13 +0000297
Marc Slemkob09f5882006-08-23 22:03:34 +0000298 // Options
299 shared_ptr<TServerOptions> serverOptions(new TServerOptions());
Marc Slemko3ea00332006-08-17 01:11:13 +0000300
Marc Slemkob09f5882006-08-23 22:03:34 +0000301 // Transport
302 shared_ptr<TServerSocket> serverSocket(new TServerSocket(port));
Marc Slemko3ea00332006-08-17 01:11:13 +0000303
Marc Slemkob09f5882006-08-23 22:03:34 +0000304 // ThreadFactory
Marc Slemko3ea00332006-08-17 01:11:13 +0000305
Marc Slemkob09f5882006-08-23 22:03:34 +0000306 shared_ptr<Thread> serverThread;
Marc Slemko3ea00332006-08-17 01:11:13 +0000307
Marc Slemkob09f5882006-08-23 22:03:34 +0000308 if(serverType == "simple") {
309
310 serverThread = threadFactory->newThread(shared_ptr<Runnable>(new TSimpleServer(server, serverOptions, serverSocket)));
311
312 } else if(serverType == "thread-pool") {
313
314 shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount);
315
316 threadManager->threadFactory(threadFactory);
317
318 threadManager->start();
319
320 serverThread = threadFactory->newThread(shared_ptr<TServer>(new TThreadPoolServer(server,
321 serverOptions,
322 serverSocket,
323 threadManager)));
324 }
325
326 cerr << "Starting the server on port " << port << endl;
327
328 serverThread->start();
329
330 // If we aren't running clients, just wait forever for external clients
331
332 if(clientCount == 0) {
333 serverThread->join();
334 }
Marc Slemko3ea00332006-08-17 01:11:13 +0000335 }
336
Marc Slemkob09f5882006-08-23 22:03:34 +0000337 if(clientCount > 0) {
Marc Slemko3ea00332006-08-17 01:11:13 +0000338
Marc Slemkob09f5882006-08-23 22:03:34 +0000339 Monitor monitor;
Marc Slemko3ea00332006-08-17 01:11:13 +0000340
Marc Slemkob09f5882006-08-23 22:03:34 +0000341 size_t threadCount = 0;
Marc Slemko3ea00332006-08-17 01:11:13 +0000342
Marc Slemkob09f5882006-08-23 22:03:34 +0000343 set<shared_ptr<Thread> > clientThreads;
Marc Slemko3ea00332006-08-17 01:11:13 +0000344
Marc Slemkod97eb612006-08-24 23:37:36 +0000345 if(callName == "echoVoid") { loopType = T_VOID;}
346 else if(callName == "echoByte") { loopType = T_BYTE;}
347 else if(callName == "echoI16") { loopType = T_I16;}
348 else if(callName == "echoI32") { loopType = T_I32;}
349 else if(callName == "echoI64") { loopType = T_I64;}
350 else if(callName == "echoU16") { loopType = T_U16;}
351 else if(callName == "echoU32") { loopType = T_U32;}
352 else if(callName == "echoU64") { loopType = T_U64;}
353 else if(callName == "echoString") { loopType = T_STRING;}
354 else {throw invalid_argument("Unknown service call "+callName);}
355
Marc Slemkob09f5882006-08-23 22:03:34 +0000356 for(size_t ix = 0; ix < clientCount; ix++) {
Marc Slemko3ea00332006-08-17 01:11:13 +0000357
Marc Slemkob09f5882006-08-23 22:03:34 +0000358 shared_ptr<TSocket> socket(new TSocket("127.0.01", port));
359 shared_ptr<TBufferedTransport> bufferedSocket(new TBufferedTransport(socket, 2048));
360 shared_ptr<TBinaryProtocol> binaryProtocol(new TBinaryProtocol());
361 shared_ptr<ServiceClient> serviceClient(new ServiceClient(bufferedSocket, binaryProtocol));
Marc Slemko3ea00332006-08-17 01:11:13 +0000362
Marc Slemkod97eb612006-08-24 23:37:36 +0000363 clientThreads.insert(threadFactory->newThread(shared_ptr<ClientThread>(new ClientThread(bufferedSocket, serviceClient, monitor, threadCount, loopCount, loopType))));
Marc Slemkob09f5882006-08-23 22:03:34 +0000364 }
Marc Slemko3ea00332006-08-17 01:11:13 +0000365
Marc Slemkob09f5882006-08-23 22:03:34 +0000366 for(std::set<shared_ptr<Thread> >::const_iterator thread = clientThreads.begin(); thread != clientThreads.end(); thread++) {
367 (*thread)->start();
368 }
Marc Slemko3ea00332006-08-17 01:11:13 +0000369
Marc Slemkob09f5882006-08-23 22:03:34 +0000370 long long time00;
371 long long time01;
Marc Slemko3ea00332006-08-17 01:11:13 +0000372
Marc Slemkob09f5882006-08-23 22:03:34 +0000373 {Synchronized s(monitor);
374 threadCount = clientCount;
Marc Slemko056f9ba2006-08-17 02:59:05 +0000375
Marc Slemkob09f5882006-08-23 22:03:34 +0000376 cerr << "Launch "<< clientCount << " client threads" << endl;
Marc Slemko056f9ba2006-08-17 02:59:05 +0000377
Marc Slemkob09f5882006-08-23 22:03:34 +0000378 time00 = Util::currentTime();
Marc Slemko056f9ba2006-08-17 02:59:05 +0000379
Marc Slemkob09f5882006-08-23 22:03:34 +0000380 monitor.notifyAll();
381
382 while(threadCount > 0) {
383 monitor.wait();
384 }
Marc Slemko3ea00332006-08-17 01:11:13 +0000385
Marc Slemkob09f5882006-08-23 22:03:34 +0000386 time01 = Util::currentTime();
Marc Slemko3ea00332006-08-17 01:11:13 +0000387 }
Marc Slemko056f9ba2006-08-17 02:59:05 +0000388
Marc Slemkob09f5882006-08-23 22:03:34 +0000389 long long firstTime = 9223372036854775807LL;
390 long long lastTime = 0;
Marc Slemko3ea00332006-08-17 01:11:13 +0000391
Marc Slemkob09f5882006-08-23 22:03:34 +0000392 double averageTime = 0;
393 long long minTime = 9223372036854775807LL;
394 long long maxTime = 0;
Marc Slemko056f9ba2006-08-17 02:59:05 +0000395
Marc Slemkob09f5882006-08-23 22:03:34 +0000396 for(set<shared_ptr<Thread> >::iterator ix = clientThreads.begin(); ix != clientThreads.end(); ix++) {
Marc Slemko056f9ba2006-08-17 02:59:05 +0000397
Marc Slemkob09f5882006-08-23 22:03:34 +0000398 shared_ptr<ClientThread> client = dynamic_pointer_cast<ClientThread>((*ix)->runnable());
Marc Slemko056f9ba2006-08-17 02:59:05 +0000399
Marc Slemkob09f5882006-08-23 22:03:34 +0000400 long long delta = client->_endTime - client->_startTime;
Marc Slemko056f9ba2006-08-17 02:59:05 +0000401
Marc Slemkob09f5882006-08-23 22:03:34 +0000402 assert(delta > 0);
Marc Slemko056f9ba2006-08-17 02:59:05 +0000403
Marc Slemkob09f5882006-08-23 22:03:34 +0000404 if(client->_startTime < firstTime) {
405 firstTime = client->_startTime;
406 }
Marc Slemko056f9ba2006-08-17 02:59:05 +0000407
Marc Slemkob09f5882006-08-23 22:03:34 +0000408 if(client->_endTime > lastTime) {
409 lastTime = client->_endTime;
410 }
Marc Slemko056f9ba2006-08-17 02:59:05 +0000411
Marc Slemkob09f5882006-08-23 22:03:34 +0000412 if(delta < minTime) {
413 minTime = delta;
414 }
Marc Slemko056f9ba2006-08-17 02:59:05 +0000415
Marc Slemkob09f5882006-08-23 22:03:34 +0000416 if(delta > maxTime) {
417 maxTime = delta;
418 }
Marc Slemko056f9ba2006-08-17 02:59:05 +0000419
Marc Slemkob09f5882006-08-23 22:03:34 +0000420 averageTime+= delta;
421 }
422
423 averageTime /= clientCount;
424
425
426 cout << "workers :" << workerCount << ", client : " << clientCount << ", loops : " << loopCount << ", rate : " << (clientCount * loopCount * 1000) / ((double)(time01 - time00)) << endl;
427
428 cerr << "done." << endl;
Marc Slemko056f9ba2006-08-17 02:59:05 +0000429 }
Marc Slemko056f9ba2006-08-17 02:59:05 +0000430
Marc Slemko3ea00332006-08-17 01:11:13 +0000431 return 0;
432}