blob: 1d3ed732bca769166309eb4d151933f753525e11 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
Roger Meier49ff8b12012-04-13 09:12:31 +000020#include <thrift/concurrency/ThreadManager.h>
21#include <thrift/concurrency/PlatformThreadFactory.h>
22#include <thrift/concurrency/Monitor.h>
23#include <thrift/concurrency/Util.h>
24#include <thrift/concurrency/Mutex.h>
25#include <thrift/protocol/TBinaryProtocol.h>
26#include <thrift/server/TSimpleServer.h>
27#include <thrift/server/TThreadPoolServer.h>
28#include <thrift/server/TThreadedServer.h>
29#include <thrift/server/TNonblockingServer.h>
30#include <thrift/transport/TServerSocket.h>
31#include <thrift/transport/TSocket.h>
Divya Thaluru808d1432017-08-06 16:36:36 -070032#include <thrift/transport/TNonblockingServerSocket.h>
Roger Meier49ff8b12012-04-13 09:12:31 +000033#include <thrift/transport/TTransportUtils.h>
34#include <thrift/transport/TFileTransport.h>
Roger Meier33eaa0f2012-04-13 09:13:13 +000035#include <thrift/TLogging.h>
Mark Sleee02385b2007-06-09 01:21:16 +000036
37#include "Service.h"
38
Mark Sleee02385b2007-06-09 01:21:16 +000039#include <iostream>
40#include <set>
41#include <stdexcept>
42#include <sstream>
Mark Sleee02385b2007-06-09 01:21:16 +000043#include <map>
Jake Farrell5d02b802014-01-07 21:42:01 -050044#if _WIN32
Konrad Grochowski16a23a62014-11-13 15:33:38 +010045#include <thrift/windows/TWinsockSingleton.h>
Jake Farrell5d02b802014-01-07 21:42:01 -050046#endif
Mark Sleee02385b2007-06-09 01:21:16 +000047
48using namespace std;
Mark Sleee02385b2007-06-09 01:21:16 +000049
T Jake Lucianib5e62212009-01-31 22:36:20 +000050using namespace apache::thrift;
51using namespace apache::thrift::protocol;
52using namespace apache::thrift::transport;
53using namespace apache::thrift::server;
54using namespace apache::thrift::concurrency;
Mark Sleee02385b2007-06-09 01:21:16 +000055
56using namespace test::stress;
57
58struct eqstr {
Konrad Grochowski16a23a62014-11-13 15:33:38 +010059 bool operator()(const char* s1, const char* s2) const { return strcmp(s1, s2) == 0; }
Mark Sleee02385b2007-06-09 01:21:16 +000060};
61
62struct ltstr {
Konrad Grochowski16a23a62014-11-13 15:33:38 +010063 bool operator()(const char* s1, const char* s2) const { return strcmp(s1, s2) < 0; }
Mark Sleee02385b2007-06-09 01:21:16 +000064};
65
Mark Sleee02385b2007-06-09 01:21:16 +000066// typedef hash_map<const char*, int, hash<const char*>, eqstr> count_map;
67typedef map<const char*, int, ltstr> count_map;
68
69class Server : public ServiceIf {
Konrad Grochowski16a23a62014-11-13 15:33:38 +010070public:
Mark Sleee02385b2007-06-09 01:21:16 +000071 Server() {}
72
73 void count(const char* method) {
Mark Slee79b16942007-11-26 19:05:29 +000074 Guard m(lock_);
Mark Sleee02385b2007-06-09 01:21:16 +000075 int ct = counts_[method];
76 counts_[method] = ++ct;
77 }
78
79 void echoVoid() {
80 count("echoVoid");
Mark Slee3e5d2d72007-06-15 01:45:56 +000081 // Sleep to simulate work
Jake Farrell5d02b802014-01-07 21:42:01 -050082 THRIFT_SLEEP_USEC(1);
Mark Sleee02385b2007-06-09 01:21:16 +000083 return;
84 }
85
86 count_map getCount() {
Mark Slee79b16942007-11-26 19:05:29 +000087 Guard m(lock_);
Mark Sleee02385b2007-06-09 01:21:16 +000088 return counts_;
89 }
90
Konrad Grochowski16a23a62014-11-13 15:33:38 +010091 int8_t echoByte(const int8_t arg) { return arg; }
92 int32_t echoI32(const int32_t arg) { return arg; }
93 int64_t echoI64(const int64_t arg) { return arg; }
94 void echoString(string& out, const string& arg) {
Mark Sleee02385b2007-06-09 01:21:16 +000095 if (arg != "hello") {
Roger Meiera8cef6e2011-07-17 18:55:59 +000096 T_ERROR_ABORT("WRONG STRING (%s)!!!!", arg.c_str());
Mark Sleee02385b2007-06-09 01:21:16 +000097 }
98 out = arg;
99 }
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100100 void echoList(vector<int8_t>& out, const vector<int8_t>& arg) { out = arg; }
101 void echoSet(set<int8_t>& out, const set<int8_t>& arg) { out = arg; }
102 void echoMap(map<int8_t, int8_t>& out, const map<int8_t, int8_t>& arg) { out = arg; }
Mark Sleee02385b2007-06-09 01:21:16 +0000103
104private:
105 count_map counts_;
106 Mutex lock_;
Mark Sleee02385b2007-06-09 01:21:16 +0000107};
108
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100109class ClientThread : public Runnable {
Mark Sleee02385b2007-06-09 01:21:16 +0000110public:
cyy316723a2019-01-05 16:35:14 +0800111 ClientThread(std::shared_ptr<TTransport> transport,
112 std::shared_ptr<ServiceClient> client,
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100113 Monitor& monitor,
114 size_t& workerCount,
115 size_t loopCount,
116 TType loopType)
117 : _transport(transport),
118 _client(client),
119 _monitor(monitor),
120 _workerCount(workerCount),
121 _loopCount(loopCount),
122 _loopType(loopType) {}
Mark Sleee02385b2007-06-09 01:21:16 +0000123
124 void run() {
125
126 // Wait for all worker threads to start
127
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100128 {
129 Synchronized s(_monitor);
130 while (_workerCount == 0) {
131 _monitor.wait();
132 }
Mark Sleee02385b2007-06-09 01:21:16 +0000133 }
134
135 _startTime = Util::currentTime();
136
137 _transport->open();
138
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100139 switch (_loopType) {
140 case T_VOID:
141 loopEchoVoid();
142 break;
143 case T_BYTE:
144 loopEchoByte();
145 break;
146 case T_I32:
147 loopEchoI32();
148 break;
149 case T_I64:
150 loopEchoI64();
151 break;
152 case T_STRING:
153 loopEchoString();
154 break;
155 default:
156 cerr << "Unexpected loop type" << _loopType << endl;
157 break;
Mark Sleee02385b2007-06-09 01:21:16 +0000158 }
159
160 _endTime = Util::currentTime();
161
162 _transport->close();
163
164 _done = true;
165
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100166 {
167 Synchronized s(_monitor);
Mark Sleee02385b2007-06-09 01:21:16 +0000168
169 _workerCount--;
170
Mark Slee3e5d2d72007-06-15 01:45:56 +0000171 if (_workerCount == 0) {
Mark Sleee02385b2007-06-09 01:21:16 +0000172
173 _monitor.notify();
174 }
175 }
176 }
177
178 void loopEchoVoid() {
Mark Slee3e5d2d72007-06-15 01:45:56 +0000179 for (size_t ix = 0; ix < _loopCount; ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000180 _client->echoVoid();
181 }
182 }
183
184 void loopEchoByte() {
Mark Slee3e5d2d72007-06-15 01:45:56 +0000185 for (size_t ix = 0; ix < _loopCount; ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000186 int8_t arg = 1;
187 int8_t result;
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100188 result = _client->echoByte(arg);
Jake Farrell5d02b802014-01-07 21:42:01 -0500189 (void)result;
Mark Sleee02385b2007-06-09 01:21:16 +0000190 assert(result == arg);
191 }
192 }
193
194 void loopEchoI32() {
Mark Slee3e5d2d72007-06-15 01:45:56 +0000195 for (size_t ix = 0; ix < _loopCount; ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000196 int32_t arg = 1;
197 int32_t result;
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100198 result = _client->echoI32(arg);
Jake Farrell5d02b802014-01-07 21:42:01 -0500199 (void)result;
Mark Sleee02385b2007-06-09 01:21:16 +0000200 assert(result == arg);
201 }
202 }
203
204 void loopEchoI64() {
Mark Slee3e5d2d72007-06-15 01:45:56 +0000205 for (size_t ix = 0; ix < _loopCount; ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000206 int64_t arg = 1;
207 int64_t result;
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100208 result = _client->echoI64(arg);
Jake Farrell5d02b802014-01-07 21:42:01 -0500209 (void)result;
Mark Sleee02385b2007-06-09 01:21:16 +0000210 assert(result == arg);
211 }
212 }
213
214 void loopEchoString() {
Mark Slee3e5d2d72007-06-15 01:45:56 +0000215 for (size_t ix = 0; ix < _loopCount; ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000216 string arg = "hello";
217 string result;
218 _client->echoString(result, arg);
219 assert(result == arg);
220 }
221 }
222
cyy316723a2019-01-05 16:35:14 +0800223 std::shared_ptr<TTransport> _transport;
224 std::shared_ptr<ServiceClient> _client;
Mark Sleee02385b2007-06-09 01:21:16 +0000225 Monitor& _monitor;
226 size_t& _workerCount;
227 size_t _loopCount;
228 TType _loopType;
Roger Meier5f9614c2010-11-21 16:59:05 +0000229 int64_t _startTime;
230 int64_t _endTime;
Mark Sleee02385b2007-06-09 01:21:16 +0000231 bool _done;
232 Monitor _sleep;
233};
234
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100235int main(int argc, char** argv) {
Jake Farrell5d02b802014-01-07 21:42:01 -0500236#if _WIN32
237 transport::TWinsockSingleton::create();
238#endif
Mark Sleee02385b2007-06-09 01:21:16 +0000239
240 int port = 9091;
241 string serverType = "simple";
242 string protocolType = "binary";
Jake Farrell5d02b802014-01-07 21:42:01 -0500243 uint32_t workerCount = 4;
244 uint32_t clientCount = 20;
245 uint32_t loopCount = 1000;
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100246 TType loopType = T_VOID;
Mark Sleee02385b2007-06-09 01:21:16 +0000247 string callName = "echoVoid";
248 bool runServer = true;
249 bool logRequests = false;
250 string requestLogPath = "./requestlog.tlog";
251 bool replayRequests = false;
252
253 ostringstream usage;
254
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100255 usage << argv[0] << " [--port=<port number>] [--server] [--server-type=<server-type>] "
256 "[--protocol-type=<protocol-type>] [--workers=<worker-count>] "
257 "[--clients=<client-count>] [--loop=<loop-count>]" << endl
258 << "\tclients Number of client threads to create - 0 implies no clients, i.e. "
259 "server only. Default is " << clientCount << endl
260 << "\thelp Prints this help text." << endl
261 << "\tcall Service method to call. Default is " << callName << endl
262 << "\tloop The number of remote thrift calls each client makes. Default is "
263 << loopCount << endl << "\tport The port the server and clients should bind to "
264 "for thrift network connections. Default is " << port << endl
265 << "\tserver Run the Thrift server in this process. Default is " << runServer
266 << endl << "\tserver-type Type of server, \"simple\" or \"thread-pool\". Default is "
267 << serverType << endl
268 << "\tprotocol-type Type of protocol, \"binary\", \"ascii\", or \"xml\". Default is "
269 << protocolType << endl
270 << "\tlog-request Log all request to ./requestlog.tlog. Default is " << logRequests
271 << endl << "\treplay-request Replay requests from log file (./requestlog.tlog) Default is "
272 << replayRequests << endl << "\tworkers Number of thread pools workers. Only valid "
273 "for thread-pool server type. Default is " << workerCount
274 << endl;
Mark Sleee02385b2007-06-09 01:21:16 +0000275
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100276 map<string, string> args;
Mark Sleee02385b2007-06-09 01:21:16 +0000277
Mark Slee3e5d2d72007-06-15 01:45:56 +0000278 for (int ix = 1; ix < argc; ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000279
280 string arg(argv[ix]);
281
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100282 if (arg.compare(0, 2, "--") == 0) {
Mark Sleee02385b2007-06-09 01:21:16 +0000283
284 size_t end = arg.find_first_of("=", 2);
285
286 string key = string(arg, 2, end - 2);
287
Mark Slee3e5d2d72007-06-15 01:45:56 +0000288 if (end != string::npos) {
Mark Sleee02385b2007-06-09 01:21:16 +0000289 args[key] = string(arg, end + 1);
290 } else {
291 args[key] = "true";
292 }
293 } else {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100294 throw invalid_argument("Unexcepted command line token: " + arg);
Mark Sleee02385b2007-06-09 01:21:16 +0000295 }
296 }
297
298 try {
299
Mark Slee3e5d2d72007-06-15 01:45:56 +0000300 if (!args["clients"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000301 clientCount = atoi(args["clients"].c_str());
302 }
303
Mark Slee3e5d2d72007-06-15 01:45:56 +0000304 if (!args["help"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000305 cerr << usage.str();
306 return 0;
307 }
308
Mark Slee3e5d2d72007-06-15 01:45:56 +0000309 if (!args["loop"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000310 loopCount = atoi(args["loop"].c_str());
311 }
312
Mark Slee3e5d2d72007-06-15 01:45:56 +0000313 if (!args["call"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000314 callName = args["call"];
315 }
316
Mark Slee3e5d2d72007-06-15 01:45:56 +0000317 if (!args["port"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000318 port = atoi(args["port"].c_str());
319 }
320
Mark Slee3e5d2d72007-06-15 01:45:56 +0000321 if (!args["server"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000322 runServer = args["server"] == "true";
323 }
324
Mark Slee3e5d2d72007-06-15 01:45:56 +0000325 if (!args["log-request"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000326 logRequests = args["log-request"] == "true";
327 }
328
Mark Slee3e5d2d72007-06-15 01:45:56 +0000329 if (!args["replay-request"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000330 replayRequests = args["replay-request"] == "true";
331 }
332
Mark Slee3e5d2d72007-06-15 01:45:56 +0000333 if (!args["server-type"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000334 serverType = args["server-type"];
335 }
336
Mark Slee3e5d2d72007-06-15 01:45:56 +0000337 if (!args["workers"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000338 workerCount = atoi(args["workers"].c_str());
339 }
340
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100341 } catch (std::exception& e) {
Mark Sleee02385b2007-06-09 01:21:16 +0000342 cerr << e.what() << endl;
Jake Farrell5d02b802014-01-07 21:42:01 -0500343 cerr << usage.str();
Mark Sleee02385b2007-06-09 01:21:16 +0000344 }
345
cyy316723a2019-01-05 16:35:14 +0800346 std::shared_ptr<PlatformThreadFactory> threadFactory
347 = std::shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory());
Mark Sleee02385b2007-06-09 01:21:16 +0000348
349 // Dispatcher
cyy316723a2019-01-05 16:35:14 +0800350 std::shared_ptr<Server> serviceHandler(new Server());
Mark Sleee02385b2007-06-09 01:21:16 +0000351
352 if (replayRequests) {
cyy316723a2019-01-05 16:35:14 +0800353 std::shared_ptr<Server> serviceHandler(new Server());
354 std::shared_ptr<ServiceProcessor> serviceProcessor(new ServiceProcessor(serviceHandler));
Mark Sleee02385b2007-06-09 01:21:16 +0000355
356 // Transports
cyy316723a2019-01-05 16:35:14 +0800357 std::shared_ptr<TFileTransport> fileTransport(new TFileTransport(requestLogPath));
Mark Sleee02385b2007-06-09 01:21:16 +0000358 fileTransport->setChunkSize(2 * 1024 * 1024);
359 fileTransport->setMaxEventSize(1024 * 16);
360 fileTransport->seekToEnd();
361
362 // Protocol Factory
cyy316723a2019-01-05 16:35:14 +0800363 std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
Mark Sleee02385b2007-06-09 01:21:16 +0000364
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100365 TFileProcessor fileProcessor(serviceProcessor, protocolFactory, fileTransport);
Mark Sleee02385b2007-06-09 01:21:16 +0000366
367 fileProcessor.process(0, true);
368 exit(0);
369 }
370
Mark Slee3e5d2d72007-06-15 01:45:56 +0000371 if (runServer) {
Mark Sleee02385b2007-06-09 01:21:16 +0000372
cyy316723a2019-01-05 16:35:14 +0800373 std::shared_ptr<ServiceProcessor> serviceProcessor(new ServiceProcessor(serviceHandler));
Mark Sleee02385b2007-06-09 01:21:16 +0000374
375 // Protocol Factory
cyy316723a2019-01-05 16:35:14 +0800376 std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
Mark Sleee02385b2007-06-09 01:21:16 +0000377
378 // Transport Factory
cyy316723a2019-01-05 16:35:14 +0800379 std::shared_ptr<TTransportFactory> transportFactory;
Mark Sleee02385b2007-06-09 01:21:16 +0000380
381 if (logRequests) {
382 // initialize the log file
cyy316723a2019-01-05 16:35:14 +0800383 std::shared_ptr<TFileTransport> fileTransport(new TFileTransport(requestLogPath));
Mark Sleee02385b2007-06-09 01:21:16 +0000384 fileTransport->setChunkSize(2 * 1024 * 1024);
385 fileTransport->setMaxEventSize(1024 * 16);
386
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100387 transportFactory
cyy316723a2019-01-05 16:35:14 +0800388 = std::shared_ptr<TTransportFactory>(new TPipedTransportFactory(fileTransport));
Mark Sleee02385b2007-06-09 01:21:16 +0000389 }
390
cyy316723a2019-01-05 16:35:14 +0800391 std::shared_ptr<Thread> serverThread;
392 std::shared_ptr<Thread> serverThread2;
393 std::shared_ptr<transport::TNonblockingServerSocket> nbSocket1;
394 std::shared_ptr<transport::TNonblockingServerSocket> nbSocket2;
Mark Sleee02385b2007-06-09 01:21:16 +0000395
Mark Slee3e5d2d72007-06-15 01:45:56 +0000396 if (serverType == "simple") {
James E. King, III82ae9572017-08-05 12:23:54 -0400397
Divya Thaluru808d1432017-08-06 16:36:36 -0700398 nbSocket1.reset(new transport::TNonblockingServerSocket(port));
cyy316723a2019-01-05 16:35:14 +0800399 serverThread = threadFactory->newThread(std::shared_ptr<TServer>(
Divya Thaluru808d1432017-08-06 16:36:36 -0700400 new TNonblockingServer(serviceProcessor, protocolFactory, nbSocket1)));
401 nbSocket2.reset(new transport::TNonblockingServerSocket(port + 1));
cyy316723a2019-01-05 16:35:14 +0800402 serverThread2 = threadFactory->newThread(std::shared_ptr<TServer>(
Divya Thaluru808d1432017-08-06 16:36:36 -0700403 new TNonblockingServer(serviceProcessor, protocolFactory, nbSocket2)));
Mark Sleee02385b2007-06-09 01:21:16 +0000404
Mark Slee3e5d2d72007-06-15 01:45:56 +0000405 } else if (serverType == "thread-pool") {
Mark Sleee02385b2007-06-09 01:21:16 +0000406
cyy316723a2019-01-05 16:35:14 +0800407 std::shared_ptr<ThreadManager> threadManager
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100408 = ThreadManager::newSimpleThreadManager(workerCount);
Mark Sleee02385b2007-06-09 01:21:16 +0000409
410 threadManager->threadFactory(threadFactory);
411 threadManager->start();
Divya Thaluru808d1432017-08-06 16:36:36 -0700412 nbSocket1.reset(new transport::TNonblockingServerSocket(port));
cyy316723a2019-01-05 16:35:14 +0800413 serverThread = threadFactory->newThread(std::shared_ptr<TServer>(
Divya Thaluru808d1432017-08-06 16:36:36 -0700414 new TNonblockingServer(serviceProcessor, protocolFactory, nbSocket1, threadManager)));
415 nbSocket2.reset(new transport::TNonblockingServerSocket(port + 1));
cyy316723a2019-01-05 16:35:14 +0800416 serverThread2 = threadFactory->newThread(std::shared_ptr<TServer>(
Divya Thaluru808d1432017-08-06 16:36:36 -0700417 new TNonblockingServer(serviceProcessor, protocolFactory, nbSocket2, threadManager)));
Mark Sleee02385b2007-06-09 01:21:16 +0000418 }
419
Mark Slee79b16942007-11-26 19:05:29 +0000420 cerr << "Starting the server on port " << port << " and " << (port + 1) << endl;
Mark Sleee02385b2007-06-09 01:21:16 +0000421 serverThread->start();
Mark Slee79b16942007-11-26 19:05:29 +0000422 serverThread2->start();
Mark Sleee02385b2007-06-09 01:21:16 +0000423
424 // If we aren't running clients, just wait forever for external clients
425
426 if (clientCount == 0) {
427 serverThread->join();
Mark Slee79b16942007-11-26 19:05:29 +0000428 serverThread2->join();
Mark Sleee02385b2007-06-09 01:21:16 +0000429 }
430 }
Jake Farrell5d02b802014-01-07 21:42:01 -0500431 THRIFT_SLEEP_SEC(1);
Mark Sleee02385b2007-06-09 01:21:16 +0000432
433 if (clientCount > 0) {
434
435 Monitor monitor;
436
437 size_t threadCount = 0;
438
cyy316723a2019-01-05 16:35:14 +0800439 set<std::shared_ptr<Thread> > clientThreads;
Mark Sleee02385b2007-06-09 01:21:16 +0000440
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100441 if (callName == "echoVoid") {
442 loopType = T_VOID;
443 } else if (callName == "echoByte") {
444 loopType = T_BYTE;
445 } else if (callName == "echoI32") {
446 loopType = T_I32;
447 } else if (callName == "echoI64") {
448 loopType = T_I64;
449 } else if (callName == "echoString") {
450 loopType = T_STRING;
451 } else {
452 throw invalid_argument("Unknown service call " + callName);
453 }
Mark Sleee02385b2007-06-09 01:21:16 +0000454
Jake Farrell5d02b802014-01-07 21:42:01 -0500455 for (uint32_t ix = 0; ix < clientCount; ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000456
cyy316723a2019-01-05 16:35:14 +0800457 std::shared_ptr<TSocket> socket(new TSocket("127.0.0.1", port + (ix % 2)));
458 std::shared_ptr<TFramedTransport> framedSocket(new TFramedTransport(socket));
459 std::shared_ptr<TProtocol> protocol(new TBinaryProtocol(framedSocket));
460 std::shared_ptr<ServiceClient> serviceClient(new ServiceClient(protocol));
Mark Sleee02385b2007-06-09 01:21:16 +0000461
cyy316723a2019-01-05 16:35:14 +0800462 clientThreads.insert(threadFactory->newThread(std::shared_ptr<ClientThread>(
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100463 new ClientThread(socket, serviceClient, monitor, threadCount, loopCount, loopType))));
Mark Sleee02385b2007-06-09 01:21:16 +0000464 }
465
cyy316723a2019-01-05 16:35:14 +0800466 for (std::set<std::shared_ptr<Thread> >::const_iterator thread = clientThreads.begin();
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100467 thread != clientThreads.end();
468 thread++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000469 (*thread)->start();
470 }
471
Roger Meier5f9614c2010-11-21 16:59:05 +0000472 int64_t time00;
473 int64_t time01;
Mark Sleee02385b2007-06-09 01:21:16 +0000474
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100475 {
476 Synchronized s(monitor);
Mark Sleee02385b2007-06-09 01:21:16 +0000477 threadCount = clientCount;
478
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100479 cerr << "Launch " << clientCount << " client threads" << endl;
Mark Sleee02385b2007-06-09 01:21:16 +0000480
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100481 time00 = Util::currentTime();
Mark Sleee02385b2007-06-09 01:21:16 +0000482
483 monitor.notifyAll();
484
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100485 while (threadCount > 0) {
Mark Sleee02385b2007-06-09 01:21:16 +0000486 monitor.wait();
487 }
488
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100489 time01 = Util::currentTime();
Mark Sleee02385b2007-06-09 01:21:16 +0000490 }
491
Roger Meier5f9614c2010-11-21 16:59:05 +0000492 int64_t firstTime = 9223372036854775807LL;
493 int64_t lastTime = 0;
Mark Sleee02385b2007-06-09 01:21:16 +0000494
495 double averageTime = 0;
Roger Meier5f9614c2010-11-21 16:59:05 +0000496 int64_t minTime = 9223372036854775807LL;
497 int64_t maxTime = 0;
Mark Sleee02385b2007-06-09 01:21:16 +0000498
cyy316723a2019-01-05 16:35:14 +0800499 for (set<std::shared_ptr<Thread> >::iterator ix = clientThreads.begin();
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100500 ix != clientThreads.end();
501 ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000502
cyy316723a2019-01-05 16:35:14 +0800503 std::shared_ptr<ClientThread> client
504 = std::dynamic_pointer_cast<ClientThread>((*ix)->runnable());
Mark Sleee02385b2007-06-09 01:21:16 +0000505
Roger Meier5f9614c2010-11-21 16:59:05 +0000506 int64_t delta = client->_endTime - client->_startTime;
Mark Sleee02385b2007-06-09 01:21:16 +0000507
508 assert(delta > 0);
509
Mark Slee3e5d2d72007-06-15 01:45:56 +0000510 if (client->_startTime < firstTime) {
Mark Sleee02385b2007-06-09 01:21:16 +0000511 firstTime = client->_startTime;
512 }
513
Mark Slee3e5d2d72007-06-15 01:45:56 +0000514 if (client->_endTime > lastTime) {
Mark Sleee02385b2007-06-09 01:21:16 +0000515 lastTime = client->_endTime;
516 }
517
Mark Slee3e5d2d72007-06-15 01:45:56 +0000518 if (delta < minTime) {
Mark Sleee02385b2007-06-09 01:21:16 +0000519 minTime = delta;
520 }
521
Mark Slee3e5d2d72007-06-15 01:45:56 +0000522 if (delta > maxTime) {
Mark Sleee02385b2007-06-09 01:21:16 +0000523 maxTime = delta;
524 }
525
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100526 averageTime += delta;
Mark Sleee02385b2007-06-09 01:21:16 +0000527 }
528
529 averageTime /= clientCount;
530
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100531 cout << "workers :" << workerCount << ", client : " << clientCount << ", loops : " << loopCount
532 << ", rate : " << (clientCount * loopCount * 1000) / ((double)(time01 - time00)) << endl;
Mark Sleee02385b2007-06-09 01:21:16 +0000533
534 count_map count = serviceHandler->getCount();
535 count_map::iterator iter;
536 for (iter = count.begin(); iter != count.end(); ++iter) {
537 printf("%s => %d\n", iter->first, iter->second);
538 }
539 cerr << "done." << endl;
540 }
541
542 return 0;
543}