blob: 9fc1beb9be4b12294947d434104fbc8d7f8bfa1a [file] [log] [blame]
Mark Sleee02385b2007-06-09 01:21:16 +00001#include <concurrency/ThreadManager.h>
2#include <concurrency/PosixThreadFactory.h>
3#include <concurrency/Monitor.h>
4#include <concurrency/Util.h>
5#include <concurrency/Mutex.h>
6#include <protocol/TBinaryProtocol.h>
7#include <server/TSimpleServer.h>
8#include <server/TThreadPoolServer.h>
9#include <server/TThreadedServer.h>
10#include <server/TNonblockingServer.h>
11#include <transport/TServerSocket.h>
12#include <transport/TSocket.h>
13#include <transport/TTransportUtils.h>
14#include <transport/TFileTransport.h>
15#include <TLogging.h>
16
17#include "Service.h"
18
Mark Slee3e5d2d72007-06-15 01:45:56 +000019#include <unistd.h>
Mark Sleee02385b2007-06-09 01:21:16 +000020#include <boost/shared_ptr.hpp>
21
22#include <iostream>
23#include <set>
24#include <stdexcept>
25#include <sstream>
26
27#include <map>
28#include <ext/hash_map>
29using __gnu_cxx::hash_map;
30using __gnu_cxx::hash;
31
32using namespace std;
33using namespace boost;
34
35using namespace facebook::thrift;
36using namespace facebook::thrift::protocol;
37using namespace facebook::thrift::transport;
38using namespace facebook::thrift::server;
39using namespace facebook::thrift::concurrency;
40
41using namespace test::stress;
42
43struct eqstr {
44 bool operator()(const char* s1, const char* s2) const {
45 return strcmp(s1, s2) == 0;
46 }
47};
48
49struct ltstr {
50 bool operator()(const char* s1, const char* s2) const {
51 return strcmp(s1, s2) < 0;
52 }
53};
54
55
56// typedef hash_map<const char*, int, hash<const char*>, eqstr> count_map;
57typedef map<const char*, int, ltstr> count_map;
58
59class Server : public ServiceIf {
60 public:
61 Server() {}
62
63 void count(const char* method) {
64 MutexMonitor m(lock_);
65 int ct = counts_[method];
66 counts_[method] = ++ct;
67 }
68
69 void echoVoid() {
70 count("echoVoid");
Mark Slee3e5d2d72007-06-15 01:45:56 +000071 // Sleep to simulate work
72 usleep(5000);
Mark Sleee02385b2007-06-09 01:21:16 +000073 return;
74 }
75
76 count_map getCount() {
77 MutexMonitor m(lock_);
78 return counts_;
79 }
80
81 int8_t echoByte(const int8_t arg) {return arg;}
82 int32_t echoI32(const int32_t arg) {return arg;}
83 int64_t echoI64(const int64_t arg) {return arg;}
84 void echoString(string& out, const string &arg) {
85 if (arg != "hello") {
86 T_ERROR_ABORT("WRONG STRING!!!!");
87 }
88 out = arg;
89 }
90 void echoList(vector<int8_t> &out, const vector<int8_t> &arg) { out = arg; }
91 void echoSet(set<int8_t> &out, const set<int8_t> &arg) { out = arg; }
92 void echoMap(map<int8_t, int8_t> &out, const map<int8_t, int8_t> &arg) { out = arg; }
93
94private:
95 count_map counts_;
96 Mutex lock_;
97
98};
99
100class ClientThread: public Runnable {
101public:
102
103 ClientThread(shared_ptr<TTransport>transport, shared_ptr<ServiceClient> client, Monitor& monitor, size_t& workerCount, size_t loopCount, TType loopType) :
104 _transport(transport),
105 _client(client),
106 _monitor(monitor),
107 _workerCount(workerCount),
108 _loopCount(loopCount),
109 _loopType(loopType)
110 {}
111
112 void run() {
113
114 // Wait for all worker threads to start
115
116 {Synchronized s(_monitor);
117 while(_workerCount == 0) {
118 _monitor.wait();
119 }
120 }
121
122 _startTime = Util::currentTime();
123
124 _transport->open();
125
126 switch(_loopType) {
127 case T_VOID: loopEchoVoid(); break;
128 case T_BYTE: loopEchoByte(); break;
129 case T_I32: loopEchoI32(); break;
130 case T_I64: loopEchoI64(); break;
131 case T_STRING: loopEchoString(); break;
132 default: cerr << "Unexpected loop type" << _loopType << endl; break;
133 }
134
135 _endTime = Util::currentTime();
136
137 _transport->close();
138
139 _done = true;
140
141 {Synchronized s(_monitor);
142
143 _workerCount--;
144
Mark Slee3e5d2d72007-06-15 01:45:56 +0000145 if (_workerCount == 0) {
Mark Sleee02385b2007-06-09 01:21:16 +0000146
147 _monitor.notify();
148 }
149 }
150 }
151
152 void loopEchoVoid() {
Mark Slee3e5d2d72007-06-15 01:45:56 +0000153 for (size_t ix = 0; ix < _loopCount; ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000154 _client->echoVoid();
155 }
156 }
157
158 void loopEchoByte() {
Mark Slee3e5d2d72007-06-15 01:45:56 +0000159 for (size_t ix = 0; ix < _loopCount; ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000160 int8_t arg = 1;
161 int8_t result;
162 result =_client->echoByte(arg);
163 assert(result == arg);
164 }
165 }
166
167 void loopEchoI32() {
Mark Slee3e5d2d72007-06-15 01:45:56 +0000168 for (size_t ix = 0; ix < _loopCount; ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000169 int32_t arg = 1;
170 int32_t result;
171 result =_client->echoI32(arg);
172 assert(result == arg);
173 }
174 }
175
176 void loopEchoI64() {
Mark Slee3e5d2d72007-06-15 01:45:56 +0000177 for (size_t ix = 0; ix < _loopCount; ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000178 int64_t arg = 1;
179 int64_t result;
180 result =_client->echoI64(arg);
181 assert(result == arg);
182 }
183 }
184
185 void loopEchoString() {
Mark Slee3e5d2d72007-06-15 01:45:56 +0000186 for (size_t ix = 0; ix < _loopCount; ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000187 string arg = "hello";
188 string result;
189 _client->echoString(result, arg);
190 assert(result == arg);
191 }
192 }
193
194 shared_ptr<TTransport> _transport;
195 shared_ptr<ServiceClient> _client;
196 Monitor& _monitor;
197 size_t& _workerCount;
198 size_t _loopCount;
199 TType _loopType;
200 long long _startTime;
201 long long _endTime;
202 bool _done;
203 Monitor _sleep;
204};
205
206
207int main(int argc, char **argv) {
208
209 int port = 9091;
210 string serverType = "simple";
211 string protocolType = "binary";
212 size_t workerCount = 4;
213 size_t clientCount = 20;
214 size_t loopCount = 50000;
215 TType loopType = T_VOID;
216 string callName = "echoVoid";
217 bool runServer = true;
218 bool logRequests = false;
219 string requestLogPath = "./requestlog.tlog";
220 bool replayRequests = false;
221
222 ostringstream usage;
223
224 usage <<
225 argv[0] << " [--port=<port number>] [--server] [--server-type=<server-type>] [--protocol-type=<protocol-type>] [--workers=<worker-count>] [--clients=<client-count>] [--loop=<loop-count>]" << endl <<
226 "\tclients Number of client threads to create - 0 implies no clients, i.e. server only. Default is " << clientCount << endl <<
227 "\thelp Prints this help text." << endl <<
228 "\tcall Service method to call. Default is " << callName << endl <<
229 "\tloop The number of remote thrift calls each client makes. Default is " << loopCount << endl <<
230 "\tport The port the server and clients should bind to for thrift network connections. Default is " << port << endl <<
231 "\tserver Run the Thrift server in this process. Default is " << runServer << endl <<
232 "\tserver-type Type of server, \"simple\" or \"thread-pool\". Default is " << serverType << endl <<
233 "\tprotocol-type Type of protocol, \"binary\", \"ascii\", or \"xml\". Default is " << protocolType << endl <<
234 "\tlog-request Log all request to ./requestlog.tlog. Default is " << logRequests << endl <<
235 "\treplay-request Replay requests from log file (./requestlog.tlog) Default is " << replayRequests << endl <<
236 "\tworkers Number of thread pools workers. Only valid for thread-pool server type. Default is " << workerCount << endl;
237
238
239 map<string, string> args;
240
Mark Slee3e5d2d72007-06-15 01:45:56 +0000241 for (int ix = 1; ix < argc; ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000242
243 string arg(argv[ix]);
244
Mark Slee3e5d2d72007-06-15 01:45:56 +0000245 if (arg.compare(0,2, "--") == 0) {
Mark Sleee02385b2007-06-09 01:21:16 +0000246
247 size_t end = arg.find_first_of("=", 2);
248
249 string key = string(arg, 2, end - 2);
250
Mark Slee3e5d2d72007-06-15 01:45:56 +0000251 if (end != string::npos) {
Mark Sleee02385b2007-06-09 01:21:16 +0000252 args[key] = string(arg, end + 1);
253 } else {
254 args[key] = "true";
255 }
256 } else {
257 throw invalid_argument("Unexcepted command line token: "+arg);
258 }
259 }
260
261 try {
262
Mark Slee3e5d2d72007-06-15 01:45:56 +0000263 if (!args["clients"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000264 clientCount = atoi(args["clients"].c_str());
265 }
266
Mark Slee3e5d2d72007-06-15 01:45:56 +0000267 if (!args["help"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000268 cerr << usage.str();
269 return 0;
270 }
271
Mark Slee3e5d2d72007-06-15 01:45:56 +0000272 if (!args["loop"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000273 loopCount = atoi(args["loop"].c_str());
274 }
275
Mark Slee3e5d2d72007-06-15 01:45:56 +0000276 if (!args["call"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000277 callName = args["call"];
278 }
279
Mark Slee3e5d2d72007-06-15 01:45:56 +0000280 if (!args["port"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000281 port = atoi(args["port"].c_str());
282 }
283
Mark Slee3e5d2d72007-06-15 01:45:56 +0000284 if (!args["server"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000285 runServer = args["server"] == "true";
286 }
287
Mark Slee3e5d2d72007-06-15 01:45:56 +0000288 if (!args["log-request"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000289 logRequests = args["log-request"] == "true";
290 }
291
Mark Slee3e5d2d72007-06-15 01:45:56 +0000292 if (!args["replay-request"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000293 replayRequests = args["replay-request"] == "true";
294 }
295
Mark Slee3e5d2d72007-06-15 01:45:56 +0000296 if (!args["server-type"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000297 serverType = args["server-type"];
298 }
299
Mark Slee3e5d2d72007-06-15 01:45:56 +0000300 if (!args["workers"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000301 workerCount = atoi(args["workers"].c_str());
302 }
303
304 } catch(exception& e) {
305 cerr << e.what() << endl;
306 cerr << usage;
307 }
308
309 shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
310
311 // Dispatcher
312 shared_ptr<Server> serviceHandler(new Server());
313
314 if (replayRequests) {
315 shared_ptr<Server> serviceHandler(new Server());
316 shared_ptr<ServiceProcessor> serviceProcessor(new ServiceProcessor(serviceHandler));
317
318 // Transports
319 shared_ptr<TFileTransport> fileTransport(new TFileTransport(requestLogPath));
320 fileTransport->setChunkSize(2 * 1024 * 1024);
321 fileTransport->setMaxEventSize(1024 * 16);
322 fileTransport->seekToEnd();
323
324 // Protocol Factory
325 shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
326
327 TFileProcessor fileProcessor(serviceProcessor,
328 protocolFactory,
329 fileTransport);
330
331 fileProcessor.process(0, true);
332 exit(0);
333 }
334
335
Mark Slee3e5d2d72007-06-15 01:45:56 +0000336 if (runServer) {
Mark Sleee02385b2007-06-09 01:21:16 +0000337
338 shared_ptr<ServiceProcessor> serviceProcessor(new ServiceProcessor(serviceHandler));
339
340 // Protocol Factory
341 shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
342
343 // Transport Factory
344 shared_ptr<TTransportFactory> transportFactory;
345
346 if (logRequests) {
347 // initialize the log file
348 shared_ptr<TFileTransport> fileTransport(new TFileTransport(requestLogPath));
349 fileTransport->setChunkSize(2 * 1024 * 1024);
350 fileTransport->setMaxEventSize(1024 * 16);
351
352 transportFactory =
353 shared_ptr<TTransportFactory>(new TPipedTransportFactory(fileTransport));
354 }
355
356 shared_ptr<Thread> serverThread;
357
Mark Slee3e5d2d72007-06-15 01:45:56 +0000358 if (serverType == "simple") {
Mark Sleee02385b2007-06-09 01:21:16 +0000359
Mark Slee3e5d2d72007-06-15 01:45:56 +0000360 serverThread = threadFactory->newThread(shared_ptr<TServer>(new TNonblockingServer(serviceProcessor, protocolFactory, port)));
Mark Sleee02385b2007-06-09 01:21:16 +0000361
Mark Slee3e5d2d72007-06-15 01:45:56 +0000362 } else if (serverType == "thread-pool") {
Mark Sleee02385b2007-06-09 01:21:16 +0000363
364 shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount);
365
366 threadManager->threadFactory(threadFactory);
367 threadManager->start();
368 serverThread = threadFactory->newThread(shared_ptr<TServer>(new TNonblockingServer(serviceProcessor, protocolFactory, port, threadManager)));
369 }
370
371 cerr << "Starting the server on port " << port << endl;
372 serverThread->start();
373
374 // If we aren't running clients, just wait forever for external clients
375
376 if (clientCount == 0) {
377 serverThread->join();
378 }
379 }
380
381 if (clientCount > 0) {
382
383 Monitor monitor;
384
385 size_t threadCount = 0;
386
387 set<shared_ptr<Thread> > clientThreads;
388
Mark Slee3e5d2d72007-06-15 01:45:56 +0000389 if (callName == "echoVoid") { loopType = T_VOID;}
390 else if (callName == "echoByte") { loopType = T_BYTE;}
391 else if (callName == "echoI32") { loopType = T_I32;}
392 else if (callName == "echoI64") { loopType = T_I64;}
393 else if (callName == "echoString") { loopType = T_STRING;}
Mark Sleee02385b2007-06-09 01:21:16 +0000394 else {throw invalid_argument("Unknown service call "+callName);}
395
Mark Slee3e5d2d72007-06-15 01:45:56 +0000396 for (size_t ix = 0; ix < clientCount; ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000397
398 shared_ptr<TSocket> socket(new TSocket("127.0.0.1", port));
399 shared_ptr<TFramedTransport> framedSocket(new TFramedTransport(socket));
400 shared_ptr<TProtocol> protocol(new TBinaryProtocol(framedSocket));
401 shared_ptr<ServiceClient> serviceClient(new ServiceClient(protocol));
402
403 clientThreads.insert(threadFactory->newThread(shared_ptr<ClientThread>(new ClientThread(socket, serviceClient, monitor, threadCount, loopCount, loopType))));
404 }
405
Mark Slee3e5d2d72007-06-15 01:45:56 +0000406 for (std::set<shared_ptr<Thread> >::const_iterator thread = clientThreads.begin(); thread != clientThreads.end(); thread++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000407 (*thread)->start();
408 }
409
410 long long time00;
411 long long time01;
412
413 {Synchronized s(monitor);
414 threadCount = clientCount;
415
416 cerr << "Launch "<< clientCount << " client threads" << endl;
417
418 time00 = Util::currentTime();
419
420 monitor.notifyAll();
421
422 while(threadCount > 0) {
423 monitor.wait();
424 }
425
426 time01 = Util::currentTime();
427 }
428
429 long long firstTime = 9223372036854775807LL;
430 long long lastTime = 0;
431
432 double averageTime = 0;
433 long long minTime = 9223372036854775807LL;
434 long long maxTime = 0;
435
Mark Slee3e5d2d72007-06-15 01:45:56 +0000436 for (set<shared_ptr<Thread> >::iterator ix = clientThreads.begin(); ix != clientThreads.end(); ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000437
438 shared_ptr<ClientThread> client = dynamic_pointer_cast<ClientThread>((*ix)->runnable());
439
440 long long delta = client->_endTime - client->_startTime;
441
442 assert(delta > 0);
443
Mark Slee3e5d2d72007-06-15 01:45:56 +0000444 if (client->_startTime < firstTime) {
Mark Sleee02385b2007-06-09 01:21:16 +0000445 firstTime = client->_startTime;
446 }
447
Mark Slee3e5d2d72007-06-15 01:45:56 +0000448 if (client->_endTime > lastTime) {
Mark Sleee02385b2007-06-09 01:21:16 +0000449 lastTime = client->_endTime;
450 }
451
Mark Slee3e5d2d72007-06-15 01:45:56 +0000452 if (delta < minTime) {
Mark Sleee02385b2007-06-09 01:21:16 +0000453 minTime = delta;
454 }
455
Mark Slee3e5d2d72007-06-15 01:45:56 +0000456 if (delta > maxTime) {
Mark Sleee02385b2007-06-09 01:21:16 +0000457 maxTime = delta;
458 }
459
460 averageTime+= delta;
461 }
462
463 averageTime /= clientCount;
464
465
466 cout << "workers :" << workerCount << ", client : " << clientCount << ", loops : " << loopCount << ", rate : " << (clientCount * loopCount * 1000) / ((double)(time01 - time00)) << endl;
467
468 count_map count = serviceHandler->getCount();
469 count_map::iterator iter;
470 for (iter = count.begin(); iter != count.end(); ++iter) {
471 printf("%s => %d\n", iter->first, iter->second);
472 }
473 cerr << "done." << endl;
474 }
475
476 return 0;
477}