blob: 2ff507b755af99d0fbcab5b41f6d176ee01b01e9 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
Mark Sleee02385b2007-06-09 01:21:16 +000020#include <concurrency/ThreadManager.h>
Roger Meier3faaedf2011-10-02 10:51:45 +000021#include <concurrency/PlatformThreadFactory.h>
Mark Sleee02385b2007-06-09 01:21:16 +000022#include <concurrency/Monitor.h>
23#include <concurrency/Util.h>
24#include <concurrency/Mutex.h>
25#include <protocol/TBinaryProtocol.h>
26#include <server/TSimpleServer.h>
27#include <server/TThreadPoolServer.h>
28#include <server/TThreadedServer.h>
29#include <server/TNonblockingServer.h>
30#include <transport/TServerSocket.h>
31#include <transport/TSocket.h>
32#include <transport/TTransportUtils.h>
33#include <transport/TFileTransport.h>
34#include <TLogging.h>
35
36#include "Service.h"
37
Mark Slee3e5d2d72007-06-15 01:45:56 +000038#include <unistd.h>
Mark Sleee02385b2007-06-09 01:21:16 +000039#include <boost/shared_ptr.hpp>
40
41#include <iostream>
42#include <set>
43#include <stdexcept>
44#include <sstream>
Mark Sleee02385b2007-06-09 01:21:16 +000045#include <map>
Mark Sleee02385b2007-06-09 01:21:16 +000046
47using namespace std;
48using namespace boost;
49
T Jake Lucianib5e62212009-01-31 22:36:20 +000050using namespace apache::thrift;
51using namespace apache::thrift::protocol;
52using namespace apache::thrift::transport;
53using namespace apache::thrift::server;
54using namespace apache::thrift::concurrency;
Mark Sleee02385b2007-06-09 01:21:16 +000055
56using namespace test::stress;
57
58struct eqstr {
59 bool operator()(const char* s1, const char* s2) const {
60 return strcmp(s1, s2) == 0;
61 }
62};
63
64struct ltstr {
65 bool operator()(const char* s1, const char* s2) const {
66 return strcmp(s1, s2) < 0;
67 }
68};
69
70
71// typedef hash_map<const char*, int, hash<const char*>, eqstr> count_map;
72typedef map<const char*, int, ltstr> count_map;
73
74class Server : public ServiceIf {
75 public:
76 Server() {}
77
78 void count(const char* method) {
Mark Slee79b16942007-11-26 19:05:29 +000079 Guard m(lock_);
Mark Sleee02385b2007-06-09 01:21:16 +000080 int ct = counts_[method];
81 counts_[method] = ++ct;
82 }
83
84 void echoVoid() {
85 count("echoVoid");
Mark Slee3e5d2d72007-06-15 01:45:56 +000086 // Sleep to simulate work
87 usleep(5000);
Mark Sleee02385b2007-06-09 01:21:16 +000088 return;
89 }
90
91 count_map getCount() {
Mark Slee79b16942007-11-26 19:05:29 +000092 Guard m(lock_);
Mark Sleee02385b2007-06-09 01:21:16 +000093 return counts_;
94 }
95
96 int8_t echoByte(const int8_t arg) {return arg;}
97 int32_t echoI32(const int32_t arg) {return arg;}
98 int64_t echoI64(const int64_t arg) {return arg;}
99 void echoString(string& out, const string &arg) {
100 if (arg != "hello") {
Roger Meiera8cef6e2011-07-17 18:55:59 +0000101 T_ERROR_ABORT("WRONG STRING (%s)!!!!", arg.c_str());
Mark Sleee02385b2007-06-09 01:21:16 +0000102 }
103 out = arg;
104 }
105 void echoList(vector<int8_t> &out, const vector<int8_t> &arg) { out = arg; }
106 void echoSet(set<int8_t> &out, const set<int8_t> &arg) { out = arg; }
107 void echoMap(map<int8_t, int8_t> &out, const map<int8_t, int8_t> &arg) { out = arg; }
108
109private:
110 count_map counts_;
111 Mutex lock_;
112
113};
114
115class ClientThread: public Runnable {
116public:
117
118 ClientThread(shared_ptr<TTransport>transport, shared_ptr<ServiceClient> client, Monitor& monitor, size_t& workerCount, size_t loopCount, TType loopType) :
119 _transport(transport),
120 _client(client),
121 _monitor(monitor),
122 _workerCount(workerCount),
123 _loopCount(loopCount),
124 _loopType(loopType)
125 {}
126
127 void run() {
128
129 // Wait for all worker threads to start
130
131 {Synchronized s(_monitor);
132 while(_workerCount == 0) {
133 _monitor.wait();
134 }
135 }
136
137 _startTime = Util::currentTime();
138
139 _transport->open();
140
141 switch(_loopType) {
142 case T_VOID: loopEchoVoid(); break;
143 case T_BYTE: loopEchoByte(); break;
144 case T_I32: loopEchoI32(); break;
145 case T_I64: loopEchoI64(); break;
146 case T_STRING: loopEchoString(); break;
147 default: cerr << "Unexpected loop type" << _loopType << endl; break;
148 }
149
150 _endTime = Util::currentTime();
151
152 _transport->close();
153
154 _done = true;
155
156 {Synchronized s(_monitor);
157
158 _workerCount--;
159
Mark Slee3e5d2d72007-06-15 01:45:56 +0000160 if (_workerCount == 0) {
Mark Sleee02385b2007-06-09 01:21:16 +0000161
162 _monitor.notify();
163 }
164 }
165 }
166
167 void loopEchoVoid() {
Mark Slee3e5d2d72007-06-15 01:45:56 +0000168 for (size_t ix = 0; ix < _loopCount; ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000169 _client->echoVoid();
170 }
171 }
172
173 void loopEchoByte() {
Mark Slee3e5d2d72007-06-15 01:45:56 +0000174 for (size_t ix = 0; ix < _loopCount; ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000175 int8_t arg = 1;
176 int8_t result;
177 result =_client->echoByte(arg);
178 assert(result == arg);
179 }
180 }
181
182 void loopEchoI32() {
Mark Slee3e5d2d72007-06-15 01:45:56 +0000183 for (size_t ix = 0; ix < _loopCount; ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000184 int32_t arg = 1;
185 int32_t result;
186 result =_client->echoI32(arg);
187 assert(result == arg);
188 }
189 }
190
191 void loopEchoI64() {
Mark Slee3e5d2d72007-06-15 01:45:56 +0000192 for (size_t ix = 0; ix < _loopCount; ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000193 int64_t arg = 1;
194 int64_t result;
195 result =_client->echoI64(arg);
196 assert(result == arg);
197 }
198 }
199
200 void loopEchoString() {
Mark Slee3e5d2d72007-06-15 01:45:56 +0000201 for (size_t ix = 0; ix < _loopCount; ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000202 string arg = "hello";
203 string result;
204 _client->echoString(result, arg);
205 assert(result == arg);
206 }
207 }
208
209 shared_ptr<TTransport> _transport;
210 shared_ptr<ServiceClient> _client;
211 Monitor& _monitor;
212 size_t& _workerCount;
213 size_t _loopCount;
214 TType _loopType;
Roger Meier5f9614c2010-11-21 16:59:05 +0000215 int64_t _startTime;
216 int64_t _endTime;
Mark Sleee02385b2007-06-09 01:21:16 +0000217 bool _done;
218 Monitor _sleep;
219};
220
221
222int main(int argc, char **argv) {
223
224 int port = 9091;
225 string serverType = "simple";
226 string protocolType = "binary";
227 size_t workerCount = 4;
228 size_t clientCount = 20;
229 size_t loopCount = 50000;
230 TType loopType = T_VOID;
231 string callName = "echoVoid";
232 bool runServer = true;
233 bool logRequests = false;
234 string requestLogPath = "./requestlog.tlog";
235 bool replayRequests = false;
236
237 ostringstream usage;
238
239 usage <<
240 argv[0] << " [--port=<port number>] [--server] [--server-type=<server-type>] [--protocol-type=<protocol-type>] [--workers=<worker-count>] [--clients=<client-count>] [--loop=<loop-count>]" << endl <<
241 "\tclients Number of client threads to create - 0 implies no clients, i.e. server only. Default is " << clientCount << endl <<
242 "\thelp Prints this help text." << endl <<
243 "\tcall Service method to call. Default is " << callName << endl <<
244 "\tloop The number of remote thrift calls each client makes. Default is " << loopCount << endl <<
245 "\tport The port the server and clients should bind to for thrift network connections. Default is " << port << endl <<
246 "\tserver Run the Thrift server in this process. Default is " << runServer << endl <<
247 "\tserver-type Type of server, \"simple\" or \"thread-pool\". Default is " << serverType << endl <<
248 "\tprotocol-type Type of protocol, \"binary\", \"ascii\", or \"xml\". Default is " << protocolType << endl <<
249 "\tlog-request Log all request to ./requestlog.tlog. Default is " << logRequests << endl <<
250 "\treplay-request Replay requests from log file (./requestlog.tlog) Default is " << replayRequests << endl <<
251 "\tworkers Number of thread pools workers. Only valid for thread-pool server type. Default is " << workerCount << endl;
252
253
254 map<string, string> args;
255
Mark Slee3e5d2d72007-06-15 01:45:56 +0000256 for (int ix = 1; ix < argc; ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000257
258 string arg(argv[ix]);
259
Mark Slee3e5d2d72007-06-15 01:45:56 +0000260 if (arg.compare(0,2, "--") == 0) {
Mark Sleee02385b2007-06-09 01:21:16 +0000261
262 size_t end = arg.find_first_of("=", 2);
263
264 string key = string(arg, 2, end - 2);
265
Mark Slee3e5d2d72007-06-15 01:45:56 +0000266 if (end != string::npos) {
Mark Sleee02385b2007-06-09 01:21:16 +0000267 args[key] = string(arg, end + 1);
268 } else {
269 args[key] = "true";
270 }
271 } else {
272 throw invalid_argument("Unexcepted command line token: "+arg);
273 }
274 }
275
276 try {
277
Mark Slee3e5d2d72007-06-15 01:45:56 +0000278 if (!args["clients"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000279 clientCount = atoi(args["clients"].c_str());
280 }
281
Mark Slee3e5d2d72007-06-15 01:45:56 +0000282 if (!args["help"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000283 cerr << usage.str();
284 return 0;
285 }
286
Mark Slee3e5d2d72007-06-15 01:45:56 +0000287 if (!args["loop"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000288 loopCount = atoi(args["loop"].c_str());
289 }
290
Mark Slee3e5d2d72007-06-15 01:45:56 +0000291 if (!args["call"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000292 callName = args["call"];
293 }
294
Mark Slee3e5d2d72007-06-15 01:45:56 +0000295 if (!args["port"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000296 port = atoi(args["port"].c_str());
297 }
298
Mark Slee3e5d2d72007-06-15 01:45:56 +0000299 if (!args["server"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000300 runServer = args["server"] == "true";
301 }
302
Mark Slee3e5d2d72007-06-15 01:45:56 +0000303 if (!args["log-request"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000304 logRequests = args["log-request"] == "true";
305 }
306
Mark Slee3e5d2d72007-06-15 01:45:56 +0000307 if (!args["replay-request"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000308 replayRequests = args["replay-request"] == "true";
309 }
310
Mark Slee3e5d2d72007-06-15 01:45:56 +0000311 if (!args["server-type"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000312 serverType = args["server-type"];
313 }
314
Mark Slee3e5d2d72007-06-15 01:45:56 +0000315 if (!args["workers"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000316 workerCount = atoi(args["workers"].c_str());
317 }
318
Roger Meierbb09f442011-05-31 20:35:37 +0000319 } catch(std::exception& e) {
Mark Sleee02385b2007-06-09 01:21:16 +0000320 cerr << e.what() << endl;
321 cerr << usage;
322 }
323
Roger Meier3faaedf2011-10-02 10:51:45 +0000324 shared_ptr<PlatformThreadFactory> threadFactory = shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory());
Mark Sleee02385b2007-06-09 01:21:16 +0000325
326 // Dispatcher
327 shared_ptr<Server> serviceHandler(new Server());
328
329 if (replayRequests) {
330 shared_ptr<Server> serviceHandler(new Server());
331 shared_ptr<ServiceProcessor> serviceProcessor(new ServiceProcessor(serviceHandler));
332
333 // Transports
334 shared_ptr<TFileTransport> fileTransport(new TFileTransport(requestLogPath));
335 fileTransport->setChunkSize(2 * 1024 * 1024);
336 fileTransport->setMaxEventSize(1024 * 16);
337 fileTransport->seekToEnd();
338
339 // Protocol Factory
340 shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
341
342 TFileProcessor fileProcessor(serviceProcessor,
343 protocolFactory,
344 fileTransport);
345
346 fileProcessor.process(0, true);
347 exit(0);
348 }
349
350
Mark Slee3e5d2d72007-06-15 01:45:56 +0000351 if (runServer) {
Mark Sleee02385b2007-06-09 01:21:16 +0000352
353 shared_ptr<ServiceProcessor> serviceProcessor(new ServiceProcessor(serviceHandler));
354
355 // Protocol Factory
356 shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
357
358 // Transport Factory
359 shared_ptr<TTransportFactory> transportFactory;
360
361 if (logRequests) {
362 // initialize the log file
363 shared_ptr<TFileTransport> fileTransport(new TFileTransport(requestLogPath));
364 fileTransport->setChunkSize(2 * 1024 * 1024);
365 fileTransport->setMaxEventSize(1024 * 16);
366
367 transportFactory =
368 shared_ptr<TTransportFactory>(new TPipedTransportFactory(fileTransport));
369 }
370
371 shared_ptr<Thread> serverThread;
Mark Slee79b16942007-11-26 19:05:29 +0000372 shared_ptr<Thread> serverThread2;
Mark Sleee02385b2007-06-09 01:21:16 +0000373
Mark Slee3e5d2d72007-06-15 01:45:56 +0000374 if (serverType == "simple") {
Mark Sleee02385b2007-06-09 01:21:16 +0000375
Mark Slee3e5d2d72007-06-15 01:45:56 +0000376 serverThread = threadFactory->newThread(shared_ptr<TServer>(new TNonblockingServer(serviceProcessor, protocolFactory, port)));
Mark Slee79b16942007-11-26 19:05:29 +0000377 serverThread2 = threadFactory->newThread(shared_ptr<TServer>(new TNonblockingServer(serviceProcessor, protocolFactory, port+1)));
Mark Sleee02385b2007-06-09 01:21:16 +0000378
Mark Slee3e5d2d72007-06-15 01:45:56 +0000379 } else if (serverType == "thread-pool") {
Mark Sleee02385b2007-06-09 01:21:16 +0000380
381 shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount);
382
383 threadManager->threadFactory(threadFactory);
384 threadManager->start();
385 serverThread = threadFactory->newThread(shared_ptr<TServer>(new TNonblockingServer(serviceProcessor, protocolFactory, port, threadManager)));
Mark Slee79b16942007-11-26 19:05:29 +0000386 serverThread2 = threadFactory->newThread(shared_ptr<TServer>(new TNonblockingServer(serviceProcessor, protocolFactory, port+1, threadManager)));
Mark Sleee02385b2007-06-09 01:21:16 +0000387 }
388
Mark Slee79b16942007-11-26 19:05:29 +0000389 cerr << "Starting the server on port " << port << " and " << (port + 1) << endl;
Mark Sleee02385b2007-06-09 01:21:16 +0000390 serverThread->start();
Mark Slee79b16942007-11-26 19:05:29 +0000391 serverThread2->start();
Mark Sleee02385b2007-06-09 01:21:16 +0000392
393 // If we aren't running clients, just wait forever for external clients
394
395 if (clientCount == 0) {
396 serverThread->join();
Mark Slee79b16942007-11-26 19:05:29 +0000397 serverThread2->join();
Mark Sleee02385b2007-06-09 01:21:16 +0000398 }
399 }
Mark Slee79b16942007-11-26 19:05:29 +0000400 sleep(1);
Mark Sleee02385b2007-06-09 01:21:16 +0000401
402 if (clientCount > 0) {
403
404 Monitor monitor;
405
406 size_t threadCount = 0;
407
408 set<shared_ptr<Thread> > clientThreads;
409
Mark Slee3e5d2d72007-06-15 01:45:56 +0000410 if (callName == "echoVoid") { loopType = T_VOID;}
411 else if (callName == "echoByte") { loopType = T_BYTE;}
412 else if (callName == "echoI32") { loopType = T_I32;}
413 else if (callName == "echoI64") { loopType = T_I64;}
414 else if (callName == "echoString") { loopType = T_STRING;}
Mark Sleee02385b2007-06-09 01:21:16 +0000415 else {throw invalid_argument("Unknown service call "+callName);}
416
Mark Slee3e5d2d72007-06-15 01:45:56 +0000417 for (size_t ix = 0; ix < clientCount; ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000418
Mark Slee79b16942007-11-26 19:05:29 +0000419 shared_ptr<TSocket> socket(new TSocket("127.0.0.1", port + (ix % 2)));
Mark Sleee02385b2007-06-09 01:21:16 +0000420 shared_ptr<TFramedTransport> framedSocket(new TFramedTransport(socket));
421 shared_ptr<TProtocol> protocol(new TBinaryProtocol(framedSocket));
422 shared_ptr<ServiceClient> serviceClient(new ServiceClient(protocol));
423
424 clientThreads.insert(threadFactory->newThread(shared_ptr<ClientThread>(new ClientThread(socket, serviceClient, monitor, threadCount, loopCount, loopType))));
425 }
426
Mark Slee3e5d2d72007-06-15 01:45:56 +0000427 for (std::set<shared_ptr<Thread> >::const_iterator thread = clientThreads.begin(); thread != clientThreads.end(); thread++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000428 (*thread)->start();
429 }
430
Roger Meier5f9614c2010-11-21 16:59:05 +0000431 int64_t time00;
432 int64_t time01;
Mark Sleee02385b2007-06-09 01:21:16 +0000433
434 {Synchronized s(monitor);
435 threadCount = clientCount;
436
437 cerr << "Launch "<< clientCount << " client threads" << endl;
438
439 time00 = Util::currentTime();
440
441 monitor.notifyAll();
442
443 while(threadCount > 0) {
444 monitor.wait();
445 }
446
447 time01 = Util::currentTime();
448 }
449
Roger Meier5f9614c2010-11-21 16:59:05 +0000450 int64_t firstTime = 9223372036854775807LL;
451 int64_t lastTime = 0;
Mark Sleee02385b2007-06-09 01:21:16 +0000452
453 double averageTime = 0;
Roger Meier5f9614c2010-11-21 16:59:05 +0000454 int64_t minTime = 9223372036854775807LL;
455 int64_t maxTime = 0;
Mark Sleee02385b2007-06-09 01:21:16 +0000456
Mark Slee3e5d2d72007-06-15 01:45:56 +0000457 for (set<shared_ptr<Thread> >::iterator ix = clientThreads.begin(); ix != clientThreads.end(); ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000458
459 shared_ptr<ClientThread> client = dynamic_pointer_cast<ClientThread>((*ix)->runnable());
460
Roger Meier5f9614c2010-11-21 16:59:05 +0000461 int64_t delta = client->_endTime - client->_startTime;
Mark Sleee02385b2007-06-09 01:21:16 +0000462
463 assert(delta > 0);
464
Mark Slee3e5d2d72007-06-15 01:45:56 +0000465 if (client->_startTime < firstTime) {
Mark Sleee02385b2007-06-09 01:21:16 +0000466 firstTime = client->_startTime;
467 }
468
Mark Slee3e5d2d72007-06-15 01:45:56 +0000469 if (client->_endTime > lastTime) {
Mark Sleee02385b2007-06-09 01:21:16 +0000470 lastTime = client->_endTime;
471 }
472
Mark Slee3e5d2d72007-06-15 01:45:56 +0000473 if (delta < minTime) {
Mark Sleee02385b2007-06-09 01:21:16 +0000474 minTime = delta;
475 }
476
Mark Slee3e5d2d72007-06-15 01:45:56 +0000477 if (delta > maxTime) {
Mark Sleee02385b2007-06-09 01:21:16 +0000478 maxTime = delta;
479 }
480
481 averageTime+= delta;
482 }
483
484 averageTime /= clientCount;
485
486
487 cout << "workers :" << workerCount << ", client : " << clientCount << ", loops : " << loopCount << ", rate : " << (clientCount * loopCount * 1000) / ((double)(time01 - time00)) << endl;
488
489 count_map count = serviceHandler->getCount();
490 count_map::iterator iter;
491 for (iter = count.begin(); iter != count.end(); ++iter) {
492 printf("%s => %d\n", iter->first, iter->second);
493 }
494 cerr << "done." << endl;
495 }
496
497 return 0;
498}