blob: 8c74a815d8703b7bfbf09707a1d8908e7fa01fbc [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
Mark Sleee02385b2007-06-09 01:21:16 +000020#include <concurrency/ThreadManager.h>
21#include <concurrency/PosixThreadFactory.h>
22#include <concurrency/Monitor.h>
23#include <concurrency/Util.h>
24#include <concurrency/Mutex.h>
25#include <protocol/TBinaryProtocol.h>
26#include <server/TSimpleServer.h>
27#include <server/TThreadPoolServer.h>
28#include <server/TThreadedServer.h>
29#include <server/TNonblockingServer.h>
30#include <transport/TServerSocket.h>
31#include <transport/TSocket.h>
32#include <transport/TTransportUtils.h>
33#include <transport/TFileTransport.h>
34#include <TLogging.h>
35
36#include "Service.h"
37
Mark Slee3e5d2d72007-06-15 01:45:56 +000038#include <unistd.h>
Mark Sleee02385b2007-06-09 01:21:16 +000039#include <boost/shared_ptr.hpp>
40
41#include <iostream>
42#include <set>
43#include <stdexcept>
44#include <sstream>
45
46#include <map>
47#include <ext/hash_map>
48using __gnu_cxx::hash_map;
49using __gnu_cxx::hash;
50
51using namespace std;
52using namespace boost;
53
T Jake Lucianib5e62212009-01-31 22:36:20 +000054using namespace apache::thrift;
55using namespace apache::thrift::protocol;
56using namespace apache::thrift::transport;
57using namespace apache::thrift::server;
58using namespace apache::thrift::concurrency;
Mark Sleee02385b2007-06-09 01:21:16 +000059
60using namespace test::stress;
61
62struct eqstr {
63 bool operator()(const char* s1, const char* s2) const {
64 return strcmp(s1, s2) == 0;
65 }
66};
67
68struct ltstr {
69 bool operator()(const char* s1, const char* s2) const {
70 return strcmp(s1, s2) < 0;
71 }
72};
73
74
75// typedef hash_map<const char*, int, hash<const char*>, eqstr> count_map;
76typedef map<const char*, int, ltstr> count_map;
77
78class Server : public ServiceIf {
79 public:
80 Server() {}
81
82 void count(const char* method) {
Mark Slee79b16942007-11-26 19:05:29 +000083 Guard m(lock_);
Mark Sleee02385b2007-06-09 01:21:16 +000084 int ct = counts_[method];
85 counts_[method] = ++ct;
86 }
87
88 void echoVoid() {
89 count("echoVoid");
Mark Slee3e5d2d72007-06-15 01:45:56 +000090 // Sleep to simulate work
91 usleep(5000);
Mark Sleee02385b2007-06-09 01:21:16 +000092 return;
93 }
94
95 count_map getCount() {
Mark Slee79b16942007-11-26 19:05:29 +000096 Guard m(lock_);
Mark Sleee02385b2007-06-09 01:21:16 +000097 return counts_;
98 }
99
100 int8_t echoByte(const int8_t arg) {return arg;}
101 int32_t echoI32(const int32_t arg) {return arg;}
102 int64_t echoI64(const int64_t arg) {return arg;}
103 void echoString(string& out, const string &arg) {
104 if (arg != "hello") {
105 T_ERROR_ABORT("WRONG STRING!!!!");
106 }
107 out = arg;
108 }
109 void echoList(vector<int8_t> &out, const vector<int8_t> &arg) { out = arg; }
110 void echoSet(set<int8_t> &out, const set<int8_t> &arg) { out = arg; }
111 void echoMap(map<int8_t, int8_t> &out, const map<int8_t, int8_t> &arg) { out = arg; }
112
113private:
114 count_map counts_;
115 Mutex lock_;
116
117};
118
119class ClientThread: public Runnable {
120public:
121
122 ClientThread(shared_ptr<TTransport>transport, shared_ptr<ServiceClient> client, Monitor& monitor, size_t& workerCount, size_t loopCount, TType loopType) :
123 _transport(transport),
124 _client(client),
125 _monitor(monitor),
126 _workerCount(workerCount),
127 _loopCount(loopCount),
128 _loopType(loopType)
129 {}
130
131 void run() {
132
133 // Wait for all worker threads to start
134
135 {Synchronized s(_monitor);
136 while(_workerCount == 0) {
137 _monitor.wait();
138 }
139 }
140
141 _startTime = Util::currentTime();
142
143 _transport->open();
144
145 switch(_loopType) {
146 case T_VOID: loopEchoVoid(); break;
147 case T_BYTE: loopEchoByte(); break;
148 case T_I32: loopEchoI32(); break;
149 case T_I64: loopEchoI64(); break;
150 case T_STRING: loopEchoString(); break;
151 default: cerr << "Unexpected loop type" << _loopType << endl; break;
152 }
153
154 _endTime = Util::currentTime();
155
156 _transport->close();
157
158 _done = true;
159
160 {Synchronized s(_monitor);
161
162 _workerCount--;
163
Mark Slee3e5d2d72007-06-15 01:45:56 +0000164 if (_workerCount == 0) {
Mark Sleee02385b2007-06-09 01:21:16 +0000165
166 _monitor.notify();
167 }
168 }
169 }
170
171 void loopEchoVoid() {
Mark Slee3e5d2d72007-06-15 01:45:56 +0000172 for (size_t ix = 0; ix < _loopCount; ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000173 _client->echoVoid();
174 }
175 }
176
177 void loopEchoByte() {
Mark Slee3e5d2d72007-06-15 01:45:56 +0000178 for (size_t ix = 0; ix < _loopCount; ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000179 int8_t arg = 1;
180 int8_t result;
181 result =_client->echoByte(arg);
182 assert(result == arg);
183 }
184 }
185
186 void loopEchoI32() {
Mark Slee3e5d2d72007-06-15 01:45:56 +0000187 for (size_t ix = 0; ix < _loopCount; ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000188 int32_t arg = 1;
189 int32_t result;
190 result =_client->echoI32(arg);
191 assert(result == arg);
192 }
193 }
194
195 void loopEchoI64() {
Mark Slee3e5d2d72007-06-15 01:45:56 +0000196 for (size_t ix = 0; ix < _loopCount; ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000197 int64_t arg = 1;
198 int64_t result;
199 result =_client->echoI64(arg);
200 assert(result == arg);
201 }
202 }
203
204 void loopEchoString() {
Mark Slee3e5d2d72007-06-15 01:45:56 +0000205 for (size_t ix = 0; ix < _loopCount; ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000206 string arg = "hello";
207 string result;
208 _client->echoString(result, arg);
209 assert(result == arg);
210 }
211 }
212
213 shared_ptr<TTransport> _transport;
214 shared_ptr<ServiceClient> _client;
215 Monitor& _monitor;
216 size_t& _workerCount;
217 size_t _loopCount;
218 TType _loopType;
219 long long _startTime;
220 long long _endTime;
221 bool _done;
222 Monitor _sleep;
223};
224
225
226int main(int argc, char **argv) {
227
228 int port = 9091;
229 string serverType = "simple";
230 string protocolType = "binary";
231 size_t workerCount = 4;
232 size_t clientCount = 20;
233 size_t loopCount = 50000;
234 TType loopType = T_VOID;
235 string callName = "echoVoid";
236 bool runServer = true;
237 bool logRequests = false;
238 string requestLogPath = "./requestlog.tlog";
239 bool replayRequests = false;
240
241 ostringstream usage;
242
243 usage <<
244 argv[0] << " [--port=<port number>] [--server] [--server-type=<server-type>] [--protocol-type=<protocol-type>] [--workers=<worker-count>] [--clients=<client-count>] [--loop=<loop-count>]" << endl <<
245 "\tclients Number of client threads to create - 0 implies no clients, i.e. server only. Default is " << clientCount << endl <<
246 "\thelp Prints this help text." << endl <<
247 "\tcall Service method to call. Default is " << callName << endl <<
248 "\tloop The number of remote thrift calls each client makes. Default is " << loopCount << endl <<
249 "\tport The port the server and clients should bind to for thrift network connections. Default is " << port << endl <<
250 "\tserver Run the Thrift server in this process. Default is " << runServer << endl <<
251 "\tserver-type Type of server, \"simple\" or \"thread-pool\". Default is " << serverType << endl <<
252 "\tprotocol-type Type of protocol, \"binary\", \"ascii\", or \"xml\". Default is " << protocolType << endl <<
253 "\tlog-request Log all request to ./requestlog.tlog. Default is " << logRequests << endl <<
254 "\treplay-request Replay requests from log file (./requestlog.tlog) Default is " << replayRequests << endl <<
255 "\tworkers Number of thread pools workers. Only valid for thread-pool server type. Default is " << workerCount << endl;
256
257
258 map<string, string> args;
259
Mark Slee3e5d2d72007-06-15 01:45:56 +0000260 for (int ix = 1; ix < argc; ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000261
262 string arg(argv[ix]);
263
Mark Slee3e5d2d72007-06-15 01:45:56 +0000264 if (arg.compare(0,2, "--") == 0) {
Mark Sleee02385b2007-06-09 01:21:16 +0000265
266 size_t end = arg.find_first_of("=", 2);
267
268 string key = string(arg, 2, end - 2);
269
Mark Slee3e5d2d72007-06-15 01:45:56 +0000270 if (end != string::npos) {
Mark Sleee02385b2007-06-09 01:21:16 +0000271 args[key] = string(arg, end + 1);
272 } else {
273 args[key] = "true";
274 }
275 } else {
276 throw invalid_argument("Unexcepted command line token: "+arg);
277 }
278 }
279
280 try {
281
Mark Slee3e5d2d72007-06-15 01:45:56 +0000282 if (!args["clients"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000283 clientCount = atoi(args["clients"].c_str());
284 }
285
Mark Slee3e5d2d72007-06-15 01:45:56 +0000286 if (!args["help"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000287 cerr << usage.str();
288 return 0;
289 }
290
Mark Slee3e5d2d72007-06-15 01:45:56 +0000291 if (!args["loop"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000292 loopCount = atoi(args["loop"].c_str());
293 }
294
Mark Slee3e5d2d72007-06-15 01:45:56 +0000295 if (!args["call"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000296 callName = args["call"];
297 }
298
Mark Slee3e5d2d72007-06-15 01:45:56 +0000299 if (!args["port"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000300 port = atoi(args["port"].c_str());
301 }
302
Mark Slee3e5d2d72007-06-15 01:45:56 +0000303 if (!args["server"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000304 runServer = args["server"] == "true";
305 }
306
Mark Slee3e5d2d72007-06-15 01:45:56 +0000307 if (!args["log-request"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000308 logRequests = args["log-request"] == "true";
309 }
310
Mark Slee3e5d2d72007-06-15 01:45:56 +0000311 if (!args["replay-request"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000312 replayRequests = args["replay-request"] == "true";
313 }
314
Mark Slee3e5d2d72007-06-15 01:45:56 +0000315 if (!args["server-type"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000316 serverType = args["server-type"];
317 }
318
Mark Slee3e5d2d72007-06-15 01:45:56 +0000319 if (!args["workers"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000320 workerCount = atoi(args["workers"].c_str());
321 }
322
323 } catch(exception& e) {
324 cerr << e.what() << endl;
325 cerr << usage;
326 }
327
328 shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
329
330 // Dispatcher
331 shared_ptr<Server> serviceHandler(new Server());
332
333 if (replayRequests) {
334 shared_ptr<Server> serviceHandler(new Server());
335 shared_ptr<ServiceProcessor> serviceProcessor(new ServiceProcessor(serviceHandler));
336
337 // Transports
338 shared_ptr<TFileTransport> fileTransport(new TFileTransport(requestLogPath));
339 fileTransport->setChunkSize(2 * 1024 * 1024);
340 fileTransport->setMaxEventSize(1024 * 16);
341 fileTransport->seekToEnd();
342
343 // Protocol Factory
344 shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
345
346 TFileProcessor fileProcessor(serviceProcessor,
347 protocolFactory,
348 fileTransport);
349
350 fileProcessor.process(0, true);
351 exit(0);
352 }
353
354
Mark Slee3e5d2d72007-06-15 01:45:56 +0000355 if (runServer) {
Mark Sleee02385b2007-06-09 01:21:16 +0000356
357 shared_ptr<ServiceProcessor> serviceProcessor(new ServiceProcessor(serviceHandler));
358
359 // Protocol Factory
360 shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
361
362 // Transport Factory
363 shared_ptr<TTransportFactory> transportFactory;
364
365 if (logRequests) {
366 // initialize the log file
367 shared_ptr<TFileTransport> fileTransport(new TFileTransport(requestLogPath));
368 fileTransport->setChunkSize(2 * 1024 * 1024);
369 fileTransport->setMaxEventSize(1024 * 16);
370
371 transportFactory =
372 shared_ptr<TTransportFactory>(new TPipedTransportFactory(fileTransport));
373 }
374
375 shared_ptr<Thread> serverThread;
Mark Slee79b16942007-11-26 19:05:29 +0000376 shared_ptr<Thread> serverThread2;
Mark Sleee02385b2007-06-09 01:21:16 +0000377
Mark Slee3e5d2d72007-06-15 01:45:56 +0000378 if (serverType == "simple") {
Mark Sleee02385b2007-06-09 01:21:16 +0000379
Mark Slee3e5d2d72007-06-15 01:45:56 +0000380 serverThread = threadFactory->newThread(shared_ptr<TServer>(new TNonblockingServer(serviceProcessor, protocolFactory, port)));
Mark Slee79b16942007-11-26 19:05:29 +0000381 serverThread2 = threadFactory->newThread(shared_ptr<TServer>(new TNonblockingServer(serviceProcessor, protocolFactory, port+1)));
Mark Sleee02385b2007-06-09 01:21:16 +0000382
Mark Slee3e5d2d72007-06-15 01:45:56 +0000383 } else if (serverType == "thread-pool") {
Mark Sleee02385b2007-06-09 01:21:16 +0000384
385 shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount);
386
387 threadManager->threadFactory(threadFactory);
388 threadManager->start();
389 serverThread = threadFactory->newThread(shared_ptr<TServer>(new TNonblockingServer(serviceProcessor, protocolFactory, port, threadManager)));
Mark Slee79b16942007-11-26 19:05:29 +0000390 serverThread2 = threadFactory->newThread(shared_ptr<TServer>(new TNonblockingServer(serviceProcessor, protocolFactory, port+1, threadManager)));
Mark Sleee02385b2007-06-09 01:21:16 +0000391 }
392
Mark Slee79b16942007-11-26 19:05:29 +0000393 cerr << "Starting the server on port " << port << " and " << (port + 1) << endl;
Mark Sleee02385b2007-06-09 01:21:16 +0000394 serverThread->start();
Mark Slee79b16942007-11-26 19:05:29 +0000395 serverThread2->start();
Mark Sleee02385b2007-06-09 01:21:16 +0000396
397 // If we aren't running clients, just wait forever for external clients
398
399 if (clientCount == 0) {
400 serverThread->join();
Mark Slee79b16942007-11-26 19:05:29 +0000401 serverThread2->join();
Mark Sleee02385b2007-06-09 01:21:16 +0000402 }
403 }
Mark Slee79b16942007-11-26 19:05:29 +0000404 sleep(1);
Mark Sleee02385b2007-06-09 01:21:16 +0000405
406 if (clientCount > 0) {
407
408 Monitor monitor;
409
410 size_t threadCount = 0;
411
412 set<shared_ptr<Thread> > clientThreads;
413
Mark Slee3e5d2d72007-06-15 01:45:56 +0000414 if (callName == "echoVoid") { loopType = T_VOID;}
415 else if (callName == "echoByte") { loopType = T_BYTE;}
416 else if (callName == "echoI32") { loopType = T_I32;}
417 else if (callName == "echoI64") { loopType = T_I64;}
418 else if (callName == "echoString") { loopType = T_STRING;}
Mark Sleee02385b2007-06-09 01:21:16 +0000419 else {throw invalid_argument("Unknown service call "+callName);}
420
Mark Slee3e5d2d72007-06-15 01:45:56 +0000421 for (size_t ix = 0; ix < clientCount; ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000422
Mark Slee79b16942007-11-26 19:05:29 +0000423 shared_ptr<TSocket> socket(new TSocket("127.0.0.1", port + (ix % 2)));
Mark Sleee02385b2007-06-09 01:21:16 +0000424 shared_ptr<TFramedTransport> framedSocket(new TFramedTransport(socket));
425 shared_ptr<TProtocol> protocol(new TBinaryProtocol(framedSocket));
426 shared_ptr<ServiceClient> serviceClient(new ServiceClient(protocol));
427
428 clientThreads.insert(threadFactory->newThread(shared_ptr<ClientThread>(new ClientThread(socket, serviceClient, monitor, threadCount, loopCount, loopType))));
429 }
430
Mark Slee3e5d2d72007-06-15 01:45:56 +0000431 for (std::set<shared_ptr<Thread> >::const_iterator thread = clientThreads.begin(); thread != clientThreads.end(); thread++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000432 (*thread)->start();
433 }
434
435 long long time00;
436 long long time01;
437
438 {Synchronized s(monitor);
439 threadCount = clientCount;
440
441 cerr << "Launch "<< clientCount << " client threads" << endl;
442
443 time00 = Util::currentTime();
444
445 monitor.notifyAll();
446
447 while(threadCount > 0) {
448 monitor.wait();
449 }
450
451 time01 = Util::currentTime();
452 }
453
454 long long firstTime = 9223372036854775807LL;
455 long long lastTime = 0;
456
457 double averageTime = 0;
458 long long minTime = 9223372036854775807LL;
459 long long maxTime = 0;
460
Mark Slee3e5d2d72007-06-15 01:45:56 +0000461 for (set<shared_ptr<Thread> >::iterator ix = clientThreads.begin(); ix != clientThreads.end(); ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000462
463 shared_ptr<ClientThread> client = dynamic_pointer_cast<ClientThread>((*ix)->runnable());
464
465 long long delta = client->_endTime - client->_startTime;
466
467 assert(delta > 0);
468
Mark Slee3e5d2d72007-06-15 01:45:56 +0000469 if (client->_startTime < firstTime) {
Mark Sleee02385b2007-06-09 01:21:16 +0000470 firstTime = client->_startTime;
471 }
472
Mark Slee3e5d2d72007-06-15 01:45:56 +0000473 if (client->_endTime > lastTime) {
Mark Sleee02385b2007-06-09 01:21:16 +0000474 lastTime = client->_endTime;
475 }
476
Mark Slee3e5d2d72007-06-15 01:45:56 +0000477 if (delta < minTime) {
Mark Sleee02385b2007-06-09 01:21:16 +0000478 minTime = delta;
479 }
480
Mark Slee3e5d2d72007-06-15 01:45:56 +0000481 if (delta > maxTime) {
Mark Sleee02385b2007-06-09 01:21:16 +0000482 maxTime = delta;
483 }
484
485 averageTime+= delta;
486 }
487
488 averageTime /= clientCount;
489
490
491 cout << "workers :" << workerCount << ", client : " << clientCount << ", loops : " << loopCount << ", rate : " << (clientCount * loopCount * 1000) / ((double)(time01 - time00)) << endl;
492
493 count_map count = serviceHandler->getCount();
494 count_map::iterator iter;
495 for (iter = count.begin(); iter != count.end(); ++iter) {
496 printf("%s => %d\n", iter->first, iter->second);
497 }
498 cerr << "done." << endl;
499 }
500
501 return 0;
502}