blob: 69a6eb05db3e22d2a470a94320351110dbd47cfd [file] [log] [blame]
Marc Slemko3ea00332006-08-17 01:11:13 +00001#include <concurrency/ThreadManager.h>
2#include <concurrency/PosixThreadFactory.h>
3#include <concurrency/Monitor.h>
4#include <concurrency/Util.h>
5#include <protocol/TBinaryProtocol.h>
6#include <server/TSimpleServer.h>
7#include <server/TThreadPoolServer.h>
8#include <transport/TServerSocket.h>
9#include <transport/TSocket.h>
10#include <transport/TBufferedTransport.h>
11#include "StressTest.h"
12
13#include <iostream>
14#include <set>
15#include <stdexcept>
16#include <sstream>
17
18using namespace std;
19
20using namespace facebook::thrift;
21using namespace facebook::thrift::protocol;
22using namespace facebook::thrift::transport;
23using namespace facebook::thrift::server;
24
25using namespace test::stress;
26
27class Server : public ServiceServerIf {
28 public:
29 Server(shared_ptr<TProtocol> protocol) :
30 ServiceServerIf(protocol) {}
31
32 void echoVoid() {return;}
33 uint8_t echoByte(uint8_t arg) {return arg;}
34 uint16_t echoU16(uint16_t arg) {return arg;}
35 uint32_t echoU32(uint32_t arg) {return arg;}
36 uint64_t echoU64(uint64_t arg) {return arg;}
Marc Slemkob09f5882006-08-23 22:03:34 +000037 string echoString(string arg) {return arg;}
38 list<uint8_t> echoList(list<uint8_t> arg) {return arg;}
39 set<uint8_t> echoSet(set<uint8_t> arg) {return arg;}
40 map<uint8_t, uint8_t> echoMap(map<uint8_t, uint8_t> arg) {return arg;}
Marc Slemko3ea00332006-08-17 01:11:13 +000041};
42
43class ClientThread: public Runnable {
44public:
45
46 ClientThread(shared_ptr<TTransport>transport, shared_ptr<ServiceClient> client, Monitor& monitor, size_t& workerCount, size_t loopCount) :
47 _transport(transport),
48 _client(client),
49 _monitor(monitor),
50 _workerCount(workerCount),
51 _loopCount(loopCount)
52 {}
53
54 void run() {
55
56 // Wait for all worker threads to start
57
58 {Synchronized s(_monitor);
59 while(_workerCount == 0) {
60 _monitor.wait();
61 }
62 }
63
64 _startTime = Util::currentTime();
65
66 _transport->open();
67
68 //uint64_t arg = 0;
69 //uint64_t result = 0;
70
71 for(size_t ix = 0; ix < _loopCount; ix++) {
72 // result = _client->echoU64(arg);
73 // assert(result == arg);
74 _client->echoVoid();
75 //arg++;
76 }
77
78 _endTime = Util::currentTime();
79
80 _transport->close();
81
82 _done = true;
83
84 {Synchronized s(_monitor);
85
86 _workerCount--;
Marc Slemko056f9ba2006-08-17 02:59:05 +000087
Marc Slemko3ea00332006-08-17 01:11:13 +000088 if(_workerCount == 0) {
89
90 _monitor.notify();
91 }
92 }
93 }
94
Marc Slemko3ea00332006-08-17 01:11:13 +000095 shared_ptr<TTransport> _transport;
96 shared_ptr<ServiceClient> _client;
97 Monitor& _monitor;
98 size_t& _workerCount;
99 size_t _loopCount;
100 long long _startTime;
101 long long _endTime;
102 bool _done;
103 Monitor _sleep;
104};
105
Marc Slemko3ea00332006-08-17 01:11:13 +0000106int main(int argc, char **argv) {
107
108 int port = 9090;
109 string serverType = "thread-pool";
110 string protocolType = "binary";
111 size_t workerCount = 4;
112 size_t clientCount = 10;
113 size_t loopCount = 10000;
Marc Slemkob09f5882006-08-23 22:03:34 +0000114 bool runServer = true;
Marc Slemko3ea00332006-08-17 01:11:13 +0000115
Marc Slemkob09f5882006-08-23 22:03:34 +0000116 ostringstream usage;
Marc Slemko3ea00332006-08-17 01:11:13 +0000117
118 usage <<
Marc Slemkob09f5882006-08-23 22:03:34 +0000119 argv[0] << " [--port=<port number>] [--server] [--server-type=<server-type>] [--protocol-type=<protocol-type>] [--workers=<worker-count>] [--clients=<client-count>] [--loop=<loop-count>]" << endl <<
120 "\tclients Number of client threads to create - 0 implies no clients, i.e. server only. Default is " << clientCount << endl <<
121 "\thelp Prints this help text." << endl <<
122 "\tloop The number of remote thrift calls each client makes. Default is " << loopCount << endl <<
123 "\tport The port the server and clients should bind to for thrift network connections. Default is " << port << endl <<
124 "\tserver Run the Thrift server in this process. Default is " << runServer << endl <<
125 "\tserver-type Type of server, \"simple\" or \"thread-pool\". Default is " << serverType << endl <<
126 "\tprotocol-type Type of protocol, \"binary\", \"ascii\", or \"xml\". Default is " << protocolType << endl <<
127 "\tworkers Number of thread pools workers. Only valid for thread-pool server type. Default is " << workerCount << endl;
Marc Slemko3ea00332006-08-17 01:11:13 +0000128
129 map<string, string> args;
130
131 for(int ix = 1; ix < argc; ix++) {
132
133 string arg(argv[ix]);
134
135 if(arg.compare(0,2, "--") == 0) {
136
137 size_t end = arg.find_first_of("=", 2);
138
Marc Slemko056f9ba2006-08-17 02:59:05 +0000139 string key = string(arg, 2, end - 2);
140
Marc Slemko3ea00332006-08-17 01:11:13 +0000141 if(end != string::npos) {
Marc Slemko056f9ba2006-08-17 02:59:05 +0000142 args[key] = string(arg, end + 1);
Marc Slemko3ea00332006-08-17 01:11:13 +0000143 } else {
Marc Slemko056f9ba2006-08-17 02:59:05 +0000144 args[key] = "true";
Marc Slemko3ea00332006-08-17 01:11:13 +0000145 }
Marc Slemko3ea00332006-08-17 01:11:13 +0000146 } else {
147 throw invalid_argument("Unexcepted command line token: "+arg);
148 }
149 }
150
151 try {
152
Marc Slemkob09f5882006-08-23 22:03:34 +0000153 if(!args["clients"].empty()) {
154 clientCount = atoi(args["clients"].c_str());
155 }
156
157 if(!args["help"].empty()) {
158 cerr << usage.str();
159 return 0;
160 }
161
162 if(!args["loop"].empty()) {
163 loopCount = atoi(args["loop"].c_str());
164 }
165
Marc Slemko3ea00332006-08-17 01:11:13 +0000166 if(!args["port"].empty()) {
167 port = atoi(args["port"].c_str());
168 }
169
Marc Slemkob09f5882006-08-23 22:03:34 +0000170 if(!args["server"].empty()) {
171 runServer = args["server"] == "true";
172 }
173
Marc Slemko3ea00332006-08-17 01:11:13 +0000174 if(!args["server-type"].empty()) {
175 serverType = args["server-type"];
176
177 if(serverType == "simple") {
178
179 } else if(serverType == "thread-pool") {
180
181 } else {
182
183 throw invalid_argument("Unknown server type "+serverType);
184 }
185 }
186
187 if(!args["workers"].empty()) {
188 workerCount = atoi(args["workers"].c_str());
189 }
190
Marc Slemko3ea00332006-08-17 01:11:13 +0000191 } catch(exception& e) {
192 cerr << e.what() << endl;
193 cerr << usage;
194 }
195
Marc Slemko3ea00332006-08-17 01:11:13 +0000196 shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
197
Marc Slemkob09f5882006-08-23 22:03:34 +0000198 if(runServer) {
Marc Slemko3ea00332006-08-17 01:11:13 +0000199
Marc Slemkob09f5882006-08-23 22:03:34 +0000200 // Dispatcher
201 shared_ptr<TBinaryProtocol> binaryProtocol(new TBinaryProtocol);
Marc Slemko3ea00332006-08-17 01:11:13 +0000202
Marc Slemkob09f5882006-08-23 22:03:34 +0000203 shared_ptr<Server> server(new Server(binaryProtocol));
Marc Slemko3ea00332006-08-17 01:11:13 +0000204
Marc Slemkob09f5882006-08-23 22:03:34 +0000205 // Options
206 shared_ptr<TServerOptions> serverOptions(new TServerOptions());
Marc Slemko3ea00332006-08-17 01:11:13 +0000207
Marc Slemkob09f5882006-08-23 22:03:34 +0000208 // Transport
209 shared_ptr<TServerSocket> serverSocket(new TServerSocket(port));
Marc Slemko3ea00332006-08-17 01:11:13 +0000210
Marc Slemkob09f5882006-08-23 22:03:34 +0000211 // ThreadFactory
Marc Slemko3ea00332006-08-17 01:11:13 +0000212
Marc Slemkob09f5882006-08-23 22:03:34 +0000213 shared_ptr<Thread> serverThread;
Marc Slemko3ea00332006-08-17 01:11:13 +0000214
Marc Slemkob09f5882006-08-23 22:03:34 +0000215 if(serverType == "simple") {
216
217 serverThread = threadFactory->newThread(shared_ptr<Runnable>(new TSimpleServer(server, serverOptions, serverSocket)));
218
219 } else if(serverType == "thread-pool") {
220
221 shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount);
222
223 threadManager->threadFactory(threadFactory);
224
225 threadManager->start();
226
227 serverThread = threadFactory->newThread(shared_ptr<TServer>(new TThreadPoolServer(server,
228 serverOptions,
229 serverSocket,
230 threadManager)));
231 }
232
233 cerr << "Starting the server on port " << port << endl;
234
235 serverThread->start();
236
237 // If we aren't running clients, just wait forever for external clients
238
239 if(clientCount == 0) {
240 serverThread->join();
241 }
Marc Slemko3ea00332006-08-17 01:11:13 +0000242 }
243
Marc Slemkob09f5882006-08-23 22:03:34 +0000244 if(clientCount > 0) {
Marc Slemko3ea00332006-08-17 01:11:13 +0000245
Marc Slemkob09f5882006-08-23 22:03:34 +0000246 Monitor monitor;
Marc Slemko3ea00332006-08-17 01:11:13 +0000247
Marc Slemkob09f5882006-08-23 22:03:34 +0000248 size_t threadCount = 0;
Marc Slemko3ea00332006-08-17 01:11:13 +0000249
Marc Slemkob09f5882006-08-23 22:03:34 +0000250 set<shared_ptr<Thread> > clientThreads;
Marc Slemko3ea00332006-08-17 01:11:13 +0000251
Marc Slemkob09f5882006-08-23 22:03:34 +0000252 for(size_t ix = 0; ix < clientCount; ix++) {
Marc Slemko3ea00332006-08-17 01:11:13 +0000253
Marc Slemkob09f5882006-08-23 22:03:34 +0000254 shared_ptr<TSocket> socket(new TSocket("127.0.01", port));
255 shared_ptr<TBufferedTransport> bufferedSocket(new TBufferedTransport(socket, 2048));
256 shared_ptr<TBinaryProtocol> binaryProtocol(new TBinaryProtocol());
257 shared_ptr<ServiceClient> serviceClient(new ServiceClient(bufferedSocket, binaryProtocol));
Marc Slemko3ea00332006-08-17 01:11:13 +0000258
Marc Slemkob09f5882006-08-23 22:03:34 +0000259 clientThreads.insert(threadFactory->newThread(shared_ptr<ClientThread>(new ClientThread(bufferedSocket, serviceClient, monitor, threadCount, loopCount))));
260 }
Marc Slemko3ea00332006-08-17 01:11:13 +0000261
Marc Slemkob09f5882006-08-23 22:03:34 +0000262 for(std::set<shared_ptr<Thread> >::const_iterator thread = clientThreads.begin(); thread != clientThreads.end(); thread++) {
263 (*thread)->start();
264 }
Marc Slemko3ea00332006-08-17 01:11:13 +0000265
Marc Slemkob09f5882006-08-23 22:03:34 +0000266 long long time00;
267 long long time01;
Marc Slemko3ea00332006-08-17 01:11:13 +0000268
Marc Slemkob09f5882006-08-23 22:03:34 +0000269 {Synchronized s(monitor);
270 threadCount = clientCount;
Marc Slemko056f9ba2006-08-17 02:59:05 +0000271
Marc Slemkob09f5882006-08-23 22:03:34 +0000272 cerr << "Launch "<< clientCount << " client threads" << endl;
Marc Slemko056f9ba2006-08-17 02:59:05 +0000273
Marc Slemkob09f5882006-08-23 22:03:34 +0000274 time00 = Util::currentTime();
Marc Slemko056f9ba2006-08-17 02:59:05 +0000275
Marc Slemkob09f5882006-08-23 22:03:34 +0000276 monitor.notifyAll();
277
278 while(threadCount > 0) {
279 monitor.wait();
280 }
Marc Slemko3ea00332006-08-17 01:11:13 +0000281
Marc Slemkob09f5882006-08-23 22:03:34 +0000282 time01 = Util::currentTime();
Marc Slemko3ea00332006-08-17 01:11:13 +0000283 }
Marc Slemko056f9ba2006-08-17 02:59:05 +0000284
Marc Slemkob09f5882006-08-23 22:03:34 +0000285 long long firstTime = 9223372036854775807LL;
286 long long lastTime = 0;
Marc Slemko3ea00332006-08-17 01:11:13 +0000287
Marc Slemkob09f5882006-08-23 22:03:34 +0000288 double averageTime = 0;
289 long long minTime = 9223372036854775807LL;
290 long long maxTime = 0;
Marc Slemko056f9ba2006-08-17 02:59:05 +0000291
Marc Slemkob09f5882006-08-23 22:03:34 +0000292 for(set<shared_ptr<Thread> >::iterator ix = clientThreads.begin(); ix != clientThreads.end(); ix++) {
Marc Slemko056f9ba2006-08-17 02:59:05 +0000293
Marc Slemkob09f5882006-08-23 22:03:34 +0000294 shared_ptr<ClientThread> client = dynamic_pointer_cast<ClientThread>((*ix)->runnable());
Marc Slemko056f9ba2006-08-17 02:59:05 +0000295
Marc Slemkob09f5882006-08-23 22:03:34 +0000296 long long delta = client->_endTime - client->_startTime;
Marc Slemko056f9ba2006-08-17 02:59:05 +0000297
Marc Slemkob09f5882006-08-23 22:03:34 +0000298 assert(delta > 0);
Marc Slemko056f9ba2006-08-17 02:59:05 +0000299
Marc Slemkob09f5882006-08-23 22:03:34 +0000300 if(client->_startTime < firstTime) {
301 firstTime = client->_startTime;
302 }
Marc Slemko056f9ba2006-08-17 02:59:05 +0000303
Marc Slemkob09f5882006-08-23 22:03:34 +0000304 if(client->_endTime > lastTime) {
305 lastTime = client->_endTime;
306 }
Marc Slemko056f9ba2006-08-17 02:59:05 +0000307
Marc Slemkob09f5882006-08-23 22:03:34 +0000308 if(delta < minTime) {
309 minTime = delta;
310 }
Marc Slemko056f9ba2006-08-17 02:59:05 +0000311
Marc Slemkob09f5882006-08-23 22:03:34 +0000312 if(delta > maxTime) {
313 maxTime = delta;
314 }
Marc Slemko056f9ba2006-08-17 02:59:05 +0000315
Marc Slemkob09f5882006-08-23 22:03:34 +0000316 averageTime+= delta;
317 }
318
319 averageTime /= clientCount;
320
321
322 cout << "workers :" << workerCount << ", client : " << clientCount << ", loops : " << loopCount << ", rate : " << (clientCount * loopCount * 1000) / ((double)(time01 - time00)) << endl;
323
324 cerr << "done." << endl;
Marc Slemko056f9ba2006-08-17 02:59:05 +0000325 }
Marc Slemko056f9ba2006-08-17 02:59:05 +0000326
Marc Slemko3ea00332006-08-17 01:11:13 +0000327 return 0;
328}