blob: 8cee758d71c5b8531a45a2f5ac124ce0dea9518f [file] [log] [blame]
Marc Slemko3ea00332006-08-17 01:11:13 +00001#include <concurrency/ThreadManager.h>
2#include <concurrency/PosixThreadFactory.h>
3#include <concurrency/Monitor.h>
4#include <concurrency/Util.h>
5#include <protocol/TBinaryProtocol.h>
6#include <server/TSimpleServer.h>
7#include <server/TThreadPoolServer.h>
8#include <transport/TServerSocket.h>
9#include <transport/TSocket.h>
10#include <transport/TBufferedTransport.h>
11#include "StressTest.h"
12
13#include <iostream>
14#include <set>
15#include <stdexcept>
16#include <sstream>
17
18using namespace std;
19
20using namespace facebook::thrift;
21using namespace facebook::thrift::protocol;
22using namespace facebook::thrift::transport;
23using namespace facebook::thrift::server;
24
25using namespace test::stress;
26
27class Server : public ServiceServerIf {
28 public:
29 Server(shared_ptr<TProtocol> protocol) :
30 ServiceServerIf(protocol) {}
31
32 void echoVoid() {return;}
33 uint8_t echoByte(uint8_t arg) {return arg;}
34 uint16_t echoU16(uint16_t arg) {return arg;}
35 uint32_t echoU32(uint32_t arg) {return arg;}
36 uint64_t echoU64(uint64_t arg) {return arg;}
37};
38
39class ClientThread: public Runnable {
40public:
41
42 ClientThread(shared_ptr<TTransport>transport, shared_ptr<ServiceClient> client, Monitor& monitor, size_t& workerCount, size_t loopCount) :
43 _transport(transport),
44 _client(client),
45 _monitor(monitor),
46 _workerCount(workerCount),
47 _loopCount(loopCount)
48 {}
49
50 void run() {
51
52 // Wait for all worker threads to start
53
54 {Synchronized s(_monitor);
55 while(_workerCount == 0) {
56 _monitor.wait();
57 }
58 }
59
60 _startTime = Util::currentTime();
61
62 _transport->open();
63
64 //uint64_t arg = 0;
65 //uint64_t result = 0;
66
67 for(size_t ix = 0; ix < _loopCount; ix++) {
68 // result = _client->echoU64(arg);
69 // assert(result == arg);
70 _client->echoVoid();
71 //arg++;
72 }
73
74 _endTime = Util::currentTime();
75
76 _transport->close();
77
78 _done = true;
79
80 {Synchronized s(_monitor);
81
82 _workerCount--;
83
84 if(_workerCount == 0) {
85
86 _monitor.notify();
87 }
88 }
89 }
90
91private:
92 shared_ptr<TTransport> _transport;
93 shared_ptr<ServiceClient> _client;
94 Monitor& _monitor;
95 size_t& _workerCount;
96 size_t _loopCount;
97 long long _startTime;
98 long long _endTime;
99 bool _done;
100 Monitor _sleep;
101};
102
103
104int main(int argc, char **argv) {
105
106 int port = 9090;
107 string serverType = "thread-pool";
108 string protocolType = "binary";
109 size_t workerCount = 4;
110 size_t clientCount = 10;
111 size_t loopCount = 10000;
112
113 ostringstream usage;
114
115 usage <<
116 argv[0] << " [--port=<port number>] [--server-type=<server-type>] [--protocol-type=<protocol-type>] [--workers=<worker-count>]" << endl <<
117
118 "\t\tserver-type\t\ttype of server, \"simple\" or \"thread-pool\". Default is " << serverType << endl <<
119
120 "\t\tprotocol-type\t\ttype of protocol, \"binary\", \"ascii\", or \"xml\". Default is " << protocolType << endl <<
121
122 "\t\tworkers\t\tNumber of thread pools workers. Only valid for thread-pool server type. Default is " << workerCount << endl;
123
124 map<string, string> args;
125
126 for(int ix = 1; ix < argc; ix++) {
127
128 string arg(argv[ix]);
129
130 if(arg.compare(0,2, "--") == 0) {
131
132 size_t end = arg.find_first_of("=", 2);
133
134 if(end != string::npos) {
135 args[string(arg, 2, end - 2)] = string(arg, end + 1);
136 } else {
137 args[string(arg, 2, end - 2)] = "true";
138 }
139 ix++;
140 } else {
141 throw invalid_argument("Unexcepted command line token: "+arg);
142 }
143 }
144
145 try {
146
147 if(!args["port"].empty()) {
148 port = atoi(args["port"].c_str());
149 }
150
151 if(!args["server-type"].empty()) {
152 serverType = args["server-type"];
153
154 if(serverType == "simple") {
155
156 } else if(serverType == "thread-pool") {
157
158 } else {
159
160 throw invalid_argument("Unknown server type "+serverType);
161 }
162 }
163
164 if(!args["workers"].empty()) {
165 workerCount = atoi(args["workers"].c_str());
166 }
167
168 if(!args["clients"].empty()) {
169 clientCount = atoi(args["clients"].c_str());
170 }
171
172 if(!args["loop"].empty()) {
173 loopCount = atoi(args["loop"].c_str());
174 }
175 } catch(exception& e) {
176 cerr << e.what() << endl;
177 cerr << usage;
178 }
179
180 // Dispatcher
181 shared_ptr<TBinaryProtocol> binaryProtocol(new TBinaryProtocol);
182
183 shared_ptr<Server> server(new Server(binaryProtocol));
184
185 // Options
186 shared_ptr<TServerOptions> serverOptions(new TServerOptions());
187
188 // Transport
189 shared_ptr<TServerSocket> serverSocket(new TServerSocket(port));
190
191 // ThreadFactory
192
193 shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
194
195 shared_ptr<Thread> serverThread;
196
197 if(serverType == "simple") {
198
199 serverThread = threadFactory->newThread(shared_ptr<Runnable>(new TSimpleServer(server, serverOptions, serverSocket)));
200
201 } else if(serverType == "thread-pool") {
202
203 shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount);
204
205 threadManager->threadFactory(threadFactory);
206
207 threadManager->start();
208
209 serverThread = threadFactory->newThread(shared_ptr<TServer>(new TThreadPoolServer(server,
210 serverOptions,
211 serverSocket,
212 threadManager)));
213 }
214
215 cout << "Starting the server on port " << port << endl;
216
217 serverThread->start();
218
219 Monitor monitor;
220
221 size_t threadCount = 0;
222
223 set<shared_ptr<Thread> > clientThreads;
224
225 for(size_t ix = 0; ix < clientCount; ix++) {
226
227 shared_ptr<TSocket> socket(new TSocket("127.0.01", port));
228 shared_ptr<TBufferedTransport> bufferedSocket(new TBufferedTransport(socket, 2048));
229 shared_ptr<TBinaryProtocol> binaryProtocol(new TBinaryProtocol());
230 shared_ptr<ServiceClient> serviceClient(new ServiceClient(bufferedSocket, binaryProtocol));
231
232 clientThreads.insert(threadFactory->newThread(shared_ptr<ClientThread>(new ClientThread(bufferedSocket, serviceClient, monitor, threadCount, loopCount))));
233 }
234
235 for(std::set<shared_ptr<Thread> >::const_iterator thread = clientThreads.begin(); thread != clientThreads.end(); thread++) {
236 (*thread)->start();
237 }
238
239 cout << endl;
240
241 {Synchronized s(monitor);
242 threadCount = clientCount;
243
244 cout << "Launch "<< clientCount << " client threads" << endl;
245 monitor.notifyAll();
246
247 while(threadCount > 0) {
248 monitor.wait();
249 }
250 }
251
252 printf("done.\n");
253 return 0;
254}