blob: ff1237e6eda4e509eed38f94cbc91bf802881538 [file] [log] [blame]
Marc Slemko3ea00332006-08-17 01:11:13 +00001#include <concurrency/ThreadManager.h>
2#include <concurrency/PosixThreadFactory.h>
3#include <concurrency/Monitor.h>
4#include <concurrency/Util.h>
5#include <protocol/TBinaryProtocol.h>
6#include <server/TSimpleServer.h>
7#include <server/TThreadPoolServer.h>
8#include <transport/TServerSocket.h>
9#include <transport/TSocket.h>
10#include <transport/TBufferedTransport.h>
Aditya Agarwal3950f472006-10-11 02:50:15 +000011#include <transport/TBufferedTransportFactory.h>
12#include <transport/TBufferedRouterTransport.h>
13#include <transport/TBufferedRouterTransportFactory.h>
14#include <transport/TBufferedFileWriter.h>
15
16#include "Service.h"
Marc Slemko3ea00332006-08-17 01:11:13 +000017
18#include <iostream>
19#include <set>
20#include <stdexcept>
21#include <sstream>
22
23using namespace std;
24
25using namespace facebook::thrift;
26using namespace facebook::thrift::protocol;
27using namespace facebook::thrift::transport;
28using namespace facebook::thrift::server;
29
30using namespace test::stress;
31
Aditya Agarwal3950f472006-10-11 02:50:15 +000032class Server : public ServiceIf {
Marc Slemko3ea00332006-08-17 01:11:13 +000033 public:
Aditya Agarwal3950f472006-10-11 02:50:15 +000034 Server() {};
Marc Slemko3ea00332006-08-17 01:11:13 +000035 void echoVoid() {return;}
Aditya Agarwal3950f472006-10-11 02:50:15 +000036 int8_t echoByte(int8_t arg) {return arg;}
Marc Slemkod97eb612006-08-24 23:37:36 +000037 int32_t echoI32(int32_t arg) {return arg;}
38 int64_t echoI64(int64_t arg) {return arg;}
Marc Slemkob09f5882006-08-23 22:03:34 +000039 string echoString(string arg) {return arg;}
Aditya Agarwal3950f472006-10-11 02:50:15 +000040 vector<int8_t> echoList(vector<int8_t> arg) {return arg;}
41 set<int8_t> echoSet(set<int8_t> arg) {return arg;}
42 map<int8_t, int8_t> echoMap(map<int8_t, int8_t> arg) {return arg;}
Marc Slemko3ea00332006-08-17 01:11:13 +000043};
44
45class ClientThread: public Runnable {
46public:
47
Marc Slemkod97eb612006-08-24 23:37:36 +000048 ClientThread(shared_ptr<TTransport>transport, shared_ptr<ServiceClient> client, Monitor& monitor, size_t& workerCount, size_t loopCount, TType loopType) :
Marc Slemko3ea00332006-08-17 01:11:13 +000049 _transport(transport),
50 _client(client),
51 _monitor(monitor),
52 _workerCount(workerCount),
Marc Slemkod97eb612006-08-24 23:37:36 +000053 _loopCount(loopCount),
54 _loopType(loopType)
Marc Slemko3ea00332006-08-17 01:11:13 +000055 {}
56
57 void run() {
58
59 // Wait for all worker threads to start
60
61 {Synchronized s(_monitor);
62 while(_workerCount == 0) {
63 _monitor.wait();
64 }
65 }
66
67 _startTime = Util::currentTime();
68
69 _transport->open();
70
Marc Slemkod97eb612006-08-24 23:37:36 +000071 switch(_loopType) {
72 case T_VOID: loopEchoVoid(); break;
73 case T_BYTE: loopEchoByte(); break;
Marc Slemkod97eb612006-08-24 23:37:36 +000074 case T_I32: loopEchoI32(); break;
75 case T_I64: loopEchoI64(); break;
Marc Slemkod97eb612006-08-24 23:37:36 +000076 case T_STRING: loopEchoString(); break;
77 default: cerr << "Unexpected loop type" << _loopType << endl; break;
Marc Slemko3ea00332006-08-17 01:11:13 +000078 }
79
80 _endTime = Util::currentTime();
81
82 _transport->close();
83
84 _done = true;
85
86 {Synchronized s(_monitor);
87
88 _workerCount--;
Marc Slemko056f9ba2006-08-17 02:59:05 +000089
Marc Slemko3ea00332006-08-17 01:11:13 +000090 if(_workerCount == 0) {
91
92 _monitor.notify();
93 }
94 }
95 }
Marc Slemkod97eb612006-08-24 23:37:36 +000096
97 void loopEchoVoid() {
98 for(size_t ix = 0; ix < _loopCount; ix++) {
99 _client->echoVoid();
100 }
101 }
102
103 void loopEchoByte() {
104 for(size_t ix = 0; ix < _loopCount; ix++) {
Aditya Agarwal3950f472006-10-11 02:50:15 +0000105 int8_t arg = 1;
106 int8_t result;
Marc Slemkod97eb612006-08-24 23:37:36 +0000107 result =_client->echoByte(arg);
108 assert(result == arg);
109 }
110 }
111
Marc Slemkod97eb612006-08-24 23:37:36 +0000112 void loopEchoI32() {
113 for(size_t ix = 0; ix < _loopCount; ix++) {
Aditya Agarwal3950f472006-10-11 02:50:15 +0000114 int32_t arg = 1;
115 int32_t result;
Marc Slemkod97eb612006-08-24 23:37:36 +0000116 result =_client->echoI32(arg);
117 assert(result == arg);
118 }
119 }
120
121 void loopEchoI64() {
122 for(size_t ix = 0; ix < _loopCount; ix++) {
Aditya Agarwal3950f472006-10-11 02:50:15 +0000123 int64_t arg = 1;
124 int64_t result;
Marc Slemkod97eb612006-08-24 23:37:36 +0000125 result =_client->echoI64(arg);
126 assert(result == arg);
127 }
128 }
Aditya Agarwal3950f472006-10-11 02:50:15 +0000129
Marc Slemkod97eb612006-08-24 23:37:36 +0000130 void loopEchoString() {
131 for(size_t ix = 0; ix < _loopCount; ix++) {
132 string arg = "hello";
133 string result;
134 result =_client->echoString(arg);
135 assert(result == arg);
136 }
137 }
Marc Slemko3ea00332006-08-17 01:11:13 +0000138
Marc Slemko3ea00332006-08-17 01:11:13 +0000139 shared_ptr<TTransport> _transport;
140 shared_ptr<ServiceClient> _client;
141 Monitor& _monitor;
142 size_t& _workerCount;
143 size_t _loopCount;
Marc Slemkod97eb612006-08-24 23:37:36 +0000144 TType _loopType;
Marc Slemko3ea00332006-08-17 01:11:13 +0000145 long long _startTime;
146 long long _endTime;
147 bool _done;
148 Monitor _sleep;
149};
150
Marc Slemko3ea00332006-08-17 01:11:13 +0000151int main(int argc, char **argv) {
152
153 int port = 9090;
154 string serverType = "thread-pool";
155 string protocolType = "binary";
156 size_t workerCount = 4;
157 size_t clientCount = 10;
158 size_t loopCount = 10000;
Marc Slemkod97eb612006-08-24 23:37:36 +0000159 TType loopType = T_VOID;
160 string callName = "echoVoid";
Marc Slemkob09f5882006-08-23 22:03:34 +0000161 bool runServer = true;
Aditya Agarwal3950f472006-10-11 02:50:15 +0000162 bool logRequests = false;
163 string requestLogPath = "./requestlog.tlog";
Marc Slemko3ea00332006-08-17 01:11:13 +0000164
Marc Slemkob09f5882006-08-23 22:03:34 +0000165 ostringstream usage;
Marc Slemko3ea00332006-08-17 01:11:13 +0000166
167 usage <<
Marc Slemkob09f5882006-08-23 22:03:34 +0000168 argv[0] << " [--port=<port number>] [--server] [--server-type=<server-type>] [--protocol-type=<protocol-type>] [--workers=<worker-count>] [--clients=<client-count>] [--loop=<loop-count>]" << endl <<
169 "\tclients Number of client threads to create - 0 implies no clients, i.e. server only. Default is " << clientCount << endl <<
170 "\thelp Prints this help text." << endl <<
Marc Slemkod97eb612006-08-24 23:37:36 +0000171 "\tcall Service method to call. Default is " << callName << endl <<
Marc Slemkob09f5882006-08-23 22:03:34 +0000172 "\tloop The number of remote thrift calls each client makes. Default is " << loopCount << endl <<
173 "\tport The port the server and clients should bind to for thrift network connections. Default is " << port << endl <<
174 "\tserver Run the Thrift server in this process. Default is " << runServer << endl <<
175 "\tserver-type Type of server, \"simple\" or \"thread-pool\". Default is " << serverType << endl <<
176 "\tprotocol-type Type of protocol, \"binary\", \"ascii\", or \"xml\". Default is " << protocolType << endl <<
Aditya Agarwal3950f472006-10-11 02:50:15 +0000177 "\tlog-request Log all request to ./requestlog.tlog. Default is " << logRequests << endl <<
Marc Slemkob09f5882006-08-23 22:03:34 +0000178 "\tworkers Number of thread pools workers. Only valid for thread-pool server type. Default is " << workerCount << endl;
Marc Slemko3ea00332006-08-17 01:11:13 +0000179
180 map<string, string> args;
181
182 for(int ix = 1; ix < argc; ix++) {
183
184 string arg(argv[ix]);
185
186 if(arg.compare(0,2, "--") == 0) {
187
188 size_t end = arg.find_first_of("=", 2);
189
Marc Slemko056f9ba2006-08-17 02:59:05 +0000190 string key = string(arg, 2, end - 2);
191
Marc Slemko3ea00332006-08-17 01:11:13 +0000192 if(end != string::npos) {
Marc Slemko056f9ba2006-08-17 02:59:05 +0000193 args[key] = string(arg, end + 1);
Marc Slemko3ea00332006-08-17 01:11:13 +0000194 } else {
Marc Slemko056f9ba2006-08-17 02:59:05 +0000195 args[key] = "true";
Marc Slemko3ea00332006-08-17 01:11:13 +0000196 }
Marc Slemko3ea00332006-08-17 01:11:13 +0000197 } else {
198 throw invalid_argument("Unexcepted command line token: "+arg);
199 }
200 }
201
202 try {
203
Marc Slemkob09f5882006-08-23 22:03:34 +0000204 if(!args["clients"].empty()) {
205 clientCount = atoi(args["clients"].c_str());
206 }
207
208 if(!args["help"].empty()) {
209 cerr << usage.str();
210 return 0;
211 }
212
213 if(!args["loop"].empty()) {
214 loopCount = atoi(args["loop"].c_str());
215 }
216
Marc Slemkod97eb612006-08-24 23:37:36 +0000217 if(!args["call"].empty()) {
218 callName = args["call"];
219 }
220
Marc Slemko3ea00332006-08-17 01:11:13 +0000221 if(!args["port"].empty()) {
222 port = atoi(args["port"].c_str());
223 }
224
Marc Slemkob09f5882006-08-23 22:03:34 +0000225 if(!args["server"].empty()) {
226 runServer = args["server"] == "true";
227 }
228
Aditya Agarwal3950f472006-10-11 02:50:15 +0000229 if(!args["log-request"].empty()) {
230 logRequests = args["log-request"] == "true";
231 }
232
Marc Slemko3ea00332006-08-17 01:11:13 +0000233 if(!args["server-type"].empty()) {
234 serverType = args["server-type"];
235
236 if(serverType == "simple") {
237
238 } else if(serverType == "thread-pool") {
239
240 } else {
241
242 throw invalid_argument("Unknown server type "+serverType);
243 }
244 }
245
246 if(!args["workers"].empty()) {
247 workerCount = atoi(args["workers"].c_str());
248 }
249
Marc Slemko3ea00332006-08-17 01:11:13 +0000250 } catch(exception& e) {
251 cerr << e.what() << endl;
252 cerr << usage;
253 }
254
Marc Slemko3ea00332006-08-17 01:11:13 +0000255 shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
256
Marc Slemkob09f5882006-08-23 22:03:34 +0000257 if(runServer) {
Marc Slemko3ea00332006-08-17 01:11:13 +0000258
Marc Slemkob09f5882006-08-23 22:03:34 +0000259 // Dispatcher
260 shared_ptr<TBinaryProtocol> binaryProtocol(new TBinaryProtocol);
Marc Slemko3ea00332006-08-17 01:11:13 +0000261
Aditya Agarwal3950f472006-10-11 02:50:15 +0000262 shared_ptr<Server> serviceHandler(new Server());
Marc Slemko3ea00332006-08-17 01:11:13 +0000263
Aditya Agarwal3950f472006-10-11 02:50:15 +0000264 shared_ptr<ServiceProcessor> serviceProcessor(new ServiceProcessor(serviceHandler, binaryProtocol));
Marc Slemko3ea00332006-08-17 01:11:13 +0000265
Marc Slemkob09f5882006-08-23 22:03:34 +0000266 // Transport
267 shared_ptr<TServerSocket> serverSocket(new TServerSocket(port));
Marc Slemko3ea00332006-08-17 01:11:13 +0000268
Aditya Agarwal3950f472006-10-11 02:50:15 +0000269 // Transport Factory
270 shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
271
272 // Options
273 shared_ptr<TServerOptions> serverOptions(new TServerOptions());
274
275 if (logRequests) {
276 // initialize the log file
277 shared_ptr<TBufferedFileWriter> bufferedFileWriter(new TBufferedFileWriter(requestLogPath, 1000));
278 bufferedFileWriter->setChunkSize(2 * 1024 * 1024);
279 bufferedFileWriter->setMaxEventSize(1024 * 16);
280
281 transportFactory = shared_ptr<TTransportFactory>(new TBufferedRouterTransportFactory(bufferedFileWriter));
282 }
Marc Slemko3ea00332006-08-17 01:11:13 +0000283
Marc Slemkob09f5882006-08-23 22:03:34 +0000284 shared_ptr<Thread> serverThread;
Marc Slemko3ea00332006-08-17 01:11:13 +0000285
Marc Slemkob09f5882006-08-23 22:03:34 +0000286 if(serverType == "simple") {
287
Aditya Agarwal3950f472006-10-11 02:50:15 +0000288 serverThread = threadFactory->newThread(shared_ptr<TServer>(new TSimpleServer(serviceProcessor,
289 serverSocket,
290 transportFactory,
291 serverOptions)));
Marc Slemkob09f5882006-08-23 22:03:34 +0000292
293 } else if(serverType == "thread-pool") {
294
295 shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount);
296
297 threadManager->threadFactory(threadFactory);
298
299 threadManager->start();
Aditya Agarwal3950f472006-10-11 02:50:15 +0000300
301 serverThread = threadFactory->newThread(shared_ptr<TServer>(new TThreadPoolServer(serviceProcessor,
302 serverSocket,
303 transportFactory,
304 threadManager,
305 serverOptions)));
Marc Slemkob09f5882006-08-23 22:03:34 +0000306 }
307
308 cerr << "Starting the server on port " << port << endl;
309
310 serverThread->start();
311
312 // If we aren't running clients, just wait forever for external clients
313
314 if(clientCount == 0) {
315 serverThread->join();
316 }
Marc Slemko3ea00332006-08-17 01:11:13 +0000317 }
318
Marc Slemkob09f5882006-08-23 22:03:34 +0000319 if(clientCount > 0) {
Marc Slemko3ea00332006-08-17 01:11:13 +0000320
Marc Slemkob09f5882006-08-23 22:03:34 +0000321 Monitor monitor;
Marc Slemko3ea00332006-08-17 01:11:13 +0000322
Marc Slemkob09f5882006-08-23 22:03:34 +0000323 size_t threadCount = 0;
Marc Slemko3ea00332006-08-17 01:11:13 +0000324
Marc Slemkob09f5882006-08-23 22:03:34 +0000325 set<shared_ptr<Thread> > clientThreads;
Marc Slemko3ea00332006-08-17 01:11:13 +0000326
Marc Slemkod97eb612006-08-24 23:37:36 +0000327 if(callName == "echoVoid") { loopType = T_VOID;}
328 else if(callName == "echoByte") { loopType = T_BYTE;}
Marc Slemkod97eb612006-08-24 23:37:36 +0000329 else if(callName == "echoI32") { loopType = T_I32;}
330 else if(callName == "echoI64") { loopType = T_I64;}
Marc Slemkod97eb612006-08-24 23:37:36 +0000331 else if(callName == "echoString") { loopType = T_STRING;}
332 else {throw invalid_argument("Unknown service call "+callName);}
333
Marc Slemkob09f5882006-08-23 22:03:34 +0000334 for(size_t ix = 0; ix < clientCount; ix++) {
Marc Slemko3ea00332006-08-17 01:11:13 +0000335
Marc Slemkob09f5882006-08-23 22:03:34 +0000336 shared_ptr<TSocket> socket(new TSocket("127.0.01", port));
337 shared_ptr<TBufferedTransport> bufferedSocket(new TBufferedTransport(socket, 2048));
338 shared_ptr<TBinaryProtocol> binaryProtocol(new TBinaryProtocol());
Aditya Agarwal3950f472006-10-11 02:50:15 +0000339 shared_ptr<ServiceClient> serviceClient(new ServiceClient(socket, binaryProtocol));
Marc Slemko3ea00332006-08-17 01:11:13 +0000340
Aditya Agarwal3950f472006-10-11 02:50:15 +0000341 clientThreads.insert(threadFactory->newThread(shared_ptr<ClientThread>(new ClientThread(socket, serviceClient, monitor, threadCount, loopCount, loopType))));
Marc Slemkob09f5882006-08-23 22:03:34 +0000342 }
Marc Slemko3ea00332006-08-17 01:11:13 +0000343
Marc Slemkob09f5882006-08-23 22:03:34 +0000344 for(std::set<shared_ptr<Thread> >::const_iterator thread = clientThreads.begin(); thread != clientThreads.end(); thread++) {
345 (*thread)->start();
346 }
Marc Slemko3ea00332006-08-17 01:11:13 +0000347
Marc Slemkob09f5882006-08-23 22:03:34 +0000348 long long time00;
349 long long time01;
Marc Slemko3ea00332006-08-17 01:11:13 +0000350
Marc Slemkob09f5882006-08-23 22:03:34 +0000351 {Synchronized s(monitor);
352 threadCount = clientCount;
Marc Slemko056f9ba2006-08-17 02:59:05 +0000353
Marc Slemkob09f5882006-08-23 22:03:34 +0000354 cerr << "Launch "<< clientCount << " client threads" << endl;
Marc Slemko056f9ba2006-08-17 02:59:05 +0000355
Marc Slemkob09f5882006-08-23 22:03:34 +0000356 time00 = Util::currentTime();
Marc Slemko056f9ba2006-08-17 02:59:05 +0000357
Marc Slemkob09f5882006-08-23 22:03:34 +0000358 monitor.notifyAll();
359
360 while(threadCount > 0) {
361 monitor.wait();
362 }
Marc Slemko3ea00332006-08-17 01:11:13 +0000363
Marc Slemkob09f5882006-08-23 22:03:34 +0000364 time01 = Util::currentTime();
Marc Slemko3ea00332006-08-17 01:11:13 +0000365 }
Marc Slemko056f9ba2006-08-17 02:59:05 +0000366
Marc Slemkob09f5882006-08-23 22:03:34 +0000367 long long firstTime = 9223372036854775807LL;
368 long long lastTime = 0;
Marc Slemko3ea00332006-08-17 01:11:13 +0000369
Marc Slemkob09f5882006-08-23 22:03:34 +0000370 double averageTime = 0;
371 long long minTime = 9223372036854775807LL;
372 long long maxTime = 0;
Marc Slemko056f9ba2006-08-17 02:59:05 +0000373
Marc Slemkob09f5882006-08-23 22:03:34 +0000374 for(set<shared_ptr<Thread> >::iterator ix = clientThreads.begin(); ix != clientThreads.end(); ix++) {
Marc Slemko056f9ba2006-08-17 02:59:05 +0000375
Marc Slemkob09f5882006-08-23 22:03:34 +0000376 shared_ptr<ClientThread> client = dynamic_pointer_cast<ClientThread>((*ix)->runnable());
Marc Slemko056f9ba2006-08-17 02:59:05 +0000377
Marc Slemkob09f5882006-08-23 22:03:34 +0000378 long long delta = client->_endTime - client->_startTime;
Marc Slemko056f9ba2006-08-17 02:59:05 +0000379
Marc Slemkob09f5882006-08-23 22:03:34 +0000380 assert(delta > 0);
Marc Slemko056f9ba2006-08-17 02:59:05 +0000381
Marc Slemkob09f5882006-08-23 22:03:34 +0000382 if(client->_startTime < firstTime) {
383 firstTime = client->_startTime;
384 }
Marc Slemko056f9ba2006-08-17 02:59:05 +0000385
Marc Slemkob09f5882006-08-23 22:03:34 +0000386 if(client->_endTime > lastTime) {
387 lastTime = client->_endTime;
388 }
Marc Slemko056f9ba2006-08-17 02:59:05 +0000389
Marc Slemkob09f5882006-08-23 22:03:34 +0000390 if(delta < minTime) {
391 minTime = delta;
392 }
Marc Slemko056f9ba2006-08-17 02:59:05 +0000393
Marc Slemkob09f5882006-08-23 22:03:34 +0000394 if(delta > maxTime) {
395 maxTime = delta;
396 }
Marc Slemko056f9ba2006-08-17 02:59:05 +0000397
Marc Slemkob09f5882006-08-23 22:03:34 +0000398 averageTime+= delta;
399 }
400
401 averageTime /= clientCount;
402
403
404 cout << "workers :" << workerCount << ", client : " << clientCount << ", loops : " << loopCount << ", rate : " << (clientCount * loopCount * 1000) / ((double)(time01 - time00)) << endl;
405
406 cerr << "done." << endl;
Marc Slemko056f9ba2006-08-17 02:59:05 +0000407 }
Marc Slemko056f9ba2006-08-17 02:59:05 +0000408
Marc Slemko3ea00332006-08-17 01:11:13 +0000409 return 0;
410}