blob: e94ecb2db11f2b47553e231d5756725153a61389 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
Roger Meier49ff8b12012-04-13 09:12:31 +000020#include <thrift/concurrency/ThreadManager.h>
cyyca8af9b2019-01-11 22:13:12 +080021#include <thrift/concurrency/ThreadFactory.h>
Roger Meier49ff8b12012-04-13 09:12:31 +000022#include <thrift/concurrency/Monitor.h>
Roger Meier49ff8b12012-04-13 09:12:31 +000023#include <thrift/concurrency/Mutex.h>
24#include <thrift/protocol/TBinaryProtocol.h>
25#include <thrift/server/TSimpleServer.h>
26#include <thrift/server/TThreadPoolServer.h>
27#include <thrift/server/TThreadedServer.h>
28#include <thrift/server/TNonblockingServer.h>
29#include <thrift/transport/TServerSocket.h>
30#include <thrift/transport/TSocket.h>
Divya Thaluru808d1432017-08-06 16:36:36 -070031#include <thrift/transport/TNonblockingServerSocket.h>
Roger Meier49ff8b12012-04-13 09:12:31 +000032#include <thrift/transport/TTransportUtils.h>
33#include <thrift/transport/TFileTransport.h>
Roger Meier33eaa0f2012-04-13 09:13:13 +000034#include <thrift/TLogging.h>
Mark Sleee02385b2007-06-09 01:21:16 +000035
36#include "Service.h"
37
Mark Sleee02385b2007-06-09 01:21:16 +000038#include <iostream>
39#include <set>
40#include <stdexcept>
41#include <sstream>
Mark Sleee02385b2007-06-09 01:21:16 +000042#include <map>
Jake Farrell5d02b802014-01-07 21:42:01 -050043#if _WIN32
Konrad Grochowski16a23a62014-11-13 15:33:38 +010044#include <thrift/windows/TWinsockSingleton.h>
Jake Farrell5d02b802014-01-07 21:42:01 -050045#endif
Mark Sleee02385b2007-06-09 01:21:16 +000046
47using namespace std;
Mark Sleee02385b2007-06-09 01:21:16 +000048
T Jake Lucianib5e62212009-01-31 22:36:20 +000049using namespace apache::thrift;
50using namespace apache::thrift::protocol;
51using namespace apache::thrift::transport;
52using namespace apache::thrift::server;
53using namespace apache::thrift::concurrency;
Mark Sleee02385b2007-06-09 01:21:16 +000054
55using namespace test::stress;
56
57struct eqstr {
Konrad Grochowski16a23a62014-11-13 15:33:38 +010058 bool operator()(const char* s1, const char* s2) const { return strcmp(s1, s2) == 0; }
Mark Sleee02385b2007-06-09 01:21:16 +000059};
60
61struct ltstr {
Konrad Grochowski16a23a62014-11-13 15:33:38 +010062 bool operator()(const char* s1, const char* s2) const { return strcmp(s1, s2) < 0; }
Mark Sleee02385b2007-06-09 01:21:16 +000063};
64
Mark Sleee02385b2007-06-09 01:21:16 +000065// typedef hash_map<const char*, int, hash<const char*>, eqstr> count_map;
66typedef map<const char*, int, ltstr> count_map;
67
68class Server : public ServiceIf {
Konrad Grochowski16a23a62014-11-13 15:33:38 +010069public:
Sebastian Zenker042580f2019-01-29 15:48:12 +010070 Server() = default;
Mark Sleee02385b2007-06-09 01:21:16 +000071
72 void count(const char* method) {
Mark Slee79b16942007-11-26 19:05:29 +000073 Guard m(lock_);
Mark Sleee02385b2007-06-09 01:21:16 +000074 int ct = counts_[method];
75 counts_[method] = ++ct;
76 }
77
Sebastian Zenker042580f2019-01-29 15:48:12 +010078 void echoVoid() override {
Mark Sleee02385b2007-06-09 01:21:16 +000079 count("echoVoid");
Mark Slee3e5d2d72007-06-15 01:45:56 +000080 // Sleep to simulate work
Jake Farrell5d02b802014-01-07 21:42:01 -050081 THRIFT_SLEEP_USEC(1);
Mark Sleee02385b2007-06-09 01:21:16 +000082 return;
83 }
84
85 count_map getCount() {
Mark Slee79b16942007-11-26 19:05:29 +000086 Guard m(lock_);
Mark Sleee02385b2007-06-09 01:21:16 +000087 return counts_;
88 }
89
Sebastian Zenker042580f2019-01-29 15:48:12 +010090 int8_t echoByte(const int8_t arg) override { return arg; }
91 int32_t echoI32(const int32_t arg) override { return arg; }
92 int64_t echoI64(const int64_t arg) override { return arg; }
93 void echoString(string& out, const string& arg) override {
Mark Sleee02385b2007-06-09 01:21:16 +000094 if (arg != "hello") {
Roger Meiera8cef6e2011-07-17 18:55:59 +000095 T_ERROR_ABORT("WRONG STRING (%s)!!!!", arg.c_str());
Mark Sleee02385b2007-06-09 01:21:16 +000096 }
97 out = arg;
98 }
Sebastian Zenker042580f2019-01-29 15:48:12 +010099 void echoList(vector<int8_t>& out, const vector<int8_t>& arg) override { out = arg; }
100 void echoSet(set<int8_t>& out, const set<int8_t>& arg) override { out = arg; }
101 void echoMap(map<int8_t, int8_t>& out, const map<int8_t, int8_t>& arg) override { out = arg; }
Mark Sleee02385b2007-06-09 01:21:16 +0000102
103private:
104 count_map counts_;
105 Mutex lock_;
Mark Sleee02385b2007-06-09 01:21:16 +0000106};
107
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100108class ClientThread : public Runnable {
Mark Sleee02385b2007-06-09 01:21:16 +0000109public:
cyy316723a2019-01-05 16:35:14 +0800110 ClientThread(std::shared_ptr<TTransport> transport,
111 std::shared_ptr<ServiceClient> client,
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100112 Monitor& monitor,
113 size_t& workerCount,
114 size_t loopCount,
115 TType loopType)
116 : _transport(transport),
117 _client(client),
118 _monitor(monitor),
119 _workerCount(workerCount),
120 _loopCount(loopCount),
121 _loopType(loopType) {}
Mark Sleee02385b2007-06-09 01:21:16 +0000122
Sebastian Zenker042580f2019-01-29 15:48:12 +0100123 void run() override {
Mark Sleee02385b2007-06-09 01:21:16 +0000124
125 // Wait for all worker threads to start
126
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100127 {
128 Synchronized s(_monitor);
129 while (_workerCount == 0) {
130 _monitor.wait();
131 }
Mark Sleee02385b2007-06-09 01:21:16 +0000132 }
133
cyybfdbd032019-01-12 14:38:28 +0800134 _startTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
Mark Sleee02385b2007-06-09 01:21:16 +0000135
136 _transport->open();
137
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100138 switch (_loopType) {
139 case T_VOID:
140 loopEchoVoid();
141 break;
142 case T_BYTE:
143 loopEchoByte();
144 break;
145 case T_I32:
146 loopEchoI32();
147 break;
148 case T_I64:
149 loopEchoI64();
150 break;
151 case T_STRING:
152 loopEchoString();
153 break;
154 default:
155 cerr << "Unexpected loop type" << _loopType << endl;
156 break;
Mark Sleee02385b2007-06-09 01:21:16 +0000157 }
158
cyybfdbd032019-01-12 14:38:28 +0800159 _endTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
Mark Sleee02385b2007-06-09 01:21:16 +0000160
161 _transport->close();
162
163 _done = true;
164
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100165 {
166 Synchronized s(_monitor);
Mark Sleee02385b2007-06-09 01:21:16 +0000167
168 _workerCount--;
169
Mark Slee3e5d2d72007-06-15 01:45:56 +0000170 if (_workerCount == 0) {
Mark Sleee02385b2007-06-09 01:21:16 +0000171
172 _monitor.notify();
173 }
174 }
175 }
176
177 void loopEchoVoid() {
Mark Slee3e5d2d72007-06-15 01:45:56 +0000178 for (size_t ix = 0; ix < _loopCount; ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000179 _client->echoVoid();
180 }
181 }
182
183 void loopEchoByte() {
Mark Slee3e5d2d72007-06-15 01:45:56 +0000184 for (size_t ix = 0; ix < _loopCount; ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000185 int8_t arg = 1;
186 int8_t result;
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100187 result = _client->echoByte(arg);
Jake Farrell5d02b802014-01-07 21:42:01 -0500188 (void)result;
Mark Sleee02385b2007-06-09 01:21:16 +0000189 assert(result == arg);
190 }
191 }
192
193 void loopEchoI32() {
Mark Slee3e5d2d72007-06-15 01:45:56 +0000194 for (size_t ix = 0; ix < _loopCount; ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000195 int32_t arg = 1;
196 int32_t result;
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100197 result = _client->echoI32(arg);
Jake Farrell5d02b802014-01-07 21:42:01 -0500198 (void)result;
Mark Sleee02385b2007-06-09 01:21:16 +0000199 assert(result == arg);
200 }
201 }
202
203 void loopEchoI64() {
Mark Slee3e5d2d72007-06-15 01:45:56 +0000204 for (size_t ix = 0; ix < _loopCount; ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000205 int64_t arg = 1;
206 int64_t result;
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100207 result = _client->echoI64(arg);
Jake Farrell5d02b802014-01-07 21:42:01 -0500208 (void)result;
Mark Sleee02385b2007-06-09 01:21:16 +0000209 assert(result == arg);
210 }
211 }
212
213 void loopEchoString() {
Mark Slee3e5d2d72007-06-15 01:45:56 +0000214 for (size_t ix = 0; ix < _loopCount; ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000215 string arg = "hello";
216 string result;
217 _client->echoString(result, arg);
218 assert(result == arg);
219 }
220 }
221
cyy316723a2019-01-05 16:35:14 +0800222 std::shared_ptr<TTransport> _transport;
223 std::shared_ptr<ServiceClient> _client;
Mark Sleee02385b2007-06-09 01:21:16 +0000224 Monitor& _monitor;
225 size_t& _workerCount;
226 size_t _loopCount;
227 TType _loopType;
Roger Meier5f9614c2010-11-21 16:59:05 +0000228 int64_t _startTime;
229 int64_t _endTime;
Mark Sleee02385b2007-06-09 01:21:16 +0000230 bool _done;
231 Monitor _sleep;
232};
233
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100234int main(int argc, char** argv) {
Jake Farrell5d02b802014-01-07 21:42:01 -0500235#if _WIN32
236 transport::TWinsockSingleton::create();
237#endif
Mark Sleee02385b2007-06-09 01:21:16 +0000238
239 int port = 9091;
240 string serverType = "simple";
241 string protocolType = "binary";
Jake Farrell5d02b802014-01-07 21:42:01 -0500242 uint32_t workerCount = 4;
243 uint32_t clientCount = 20;
244 uint32_t loopCount = 1000;
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100245 TType loopType = T_VOID;
Mark Sleee02385b2007-06-09 01:21:16 +0000246 string callName = "echoVoid";
247 bool runServer = true;
248 bool logRequests = false;
249 string requestLogPath = "./requestlog.tlog";
250 bool replayRequests = false;
251
252 ostringstream usage;
253
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100254 usage << argv[0] << " [--port=<port number>] [--server] [--server-type=<server-type>] "
255 "[--protocol-type=<protocol-type>] [--workers=<worker-count>] "
256 "[--clients=<client-count>] [--loop=<loop-count>]" << endl
257 << "\tclients Number of client threads to create - 0 implies no clients, i.e. "
258 "server only. Default is " << clientCount << endl
259 << "\thelp Prints this help text." << endl
260 << "\tcall Service method to call. Default is " << callName << endl
261 << "\tloop The number of remote thrift calls each client makes. Default is "
262 << loopCount << endl << "\tport The port the server and clients should bind to "
263 "for thrift network connections. Default is " << port << endl
264 << "\tserver Run the Thrift server in this process. Default is " << runServer
265 << endl << "\tserver-type Type of server, \"simple\" or \"thread-pool\". Default is "
266 << serverType << endl
267 << "\tprotocol-type Type of protocol, \"binary\", \"ascii\", or \"xml\". Default is "
268 << protocolType << endl
269 << "\tlog-request Log all request to ./requestlog.tlog. Default is " << logRequests
270 << endl << "\treplay-request Replay requests from log file (./requestlog.tlog) Default is "
271 << replayRequests << endl << "\tworkers Number of thread pools workers. Only valid "
272 "for thread-pool server type. Default is " << workerCount
273 << endl;
Mark Sleee02385b2007-06-09 01:21:16 +0000274
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100275 map<string, string> args;
Mark Sleee02385b2007-06-09 01:21:16 +0000276
Mark Slee3e5d2d72007-06-15 01:45:56 +0000277 for (int ix = 1; ix < argc; ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000278
279 string arg(argv[ix]);
280
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100281 if (arg.compare(0, 2, "--") == 0) {
Mark Sleee02385b2007-06-09 01:21:16 +0000282
283 size_t end = arg.find_first_of("=", 2);
284
285 string key = string(arg, 2, end - 2);
286
Mark Slee3e5d2d72007-06-15 01:45:56 +0000287 if (end != string::npos) {
Mark Sleee02385b2007-06-09 01:21:16 +0000288 args[key] = string(arg, end + 1);
289 } else {
290 args[key] = "true";
291 }
292 } else {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100293 throw invalid_argument("Unexcepted command line token: " + arg);
Mark Sleee02385b2007-06-09 01:21:16 +0000294 }
295 }
296
297 try {
298
Mark Slee3e5d2d72007-06-15 01:45:56 +0000299 if (!args["clients"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000300 clientCount = atoi(args["clients"].c_str());
301 }
302
Mark Slee3e5d2d72007-06-15 01:45:56 +0000303 if (!args["help"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000304 cerr << usage.str();
305 return 0;
306 }
307
Mark Slee3e5d2d72007-06-15 01:45:56 +0000308 if (!args["loop"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000309 loopCount = atoi(args["loop"].c_str());
310 }
311
Mark Slee3e5d2d72007-06-15 01:45:56 +0000312 if (!args["call"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000313 callName = args["call"];
314 }
315
Mark Slee3e5d2d72007-06-15 01:45:56 +0000316 if (!args["port"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000317 port = atoi(args["port"].c_str());
318 }
319
Mark Slee3e5d2d72007-06-15 01:45:56 +0000320 if (!args["server"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000321 runServer = args["server"] == "true";
322 }
323
Mark Slee3e5d2d72007-06-15 01:45:56 +0000324 if (!args["log-request"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000325 logRequests = args["log-request"] == "true";
326 }
327
Mark Slee3e5d2d72007-06-15 01:45:56 +0000328 if (!args["replay-request"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000329 replayRequests = args["replay-request"] == "true";
330 }
331
Mark Slee3e5d2d72007-06-15 01:45:56 +0000332 if (!args["server-type"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000333 serverType = args["server-type"];
334 }
335
Mark Slee3e5d2d72007-06-15 01:45:56 +0000336 if (!args["workers"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000337 workerCount = atoi(args["workers"].c_str());
338 }
339
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100340 } catch (std::exception& e) {
Mark Sleee02385b2007-06-09 01:21:16 +0000341 cerr << e.what() << endl;
Jake Farrell5d02b802014-01-07 21:42:01 -0500342 cerr << usage.str();
Mark Sleee02385b2007-06-09 01:21:16 +0000343 }
344
cyyca8af9b2019-01-11 22:13:12 +0800345 std::shared_ptr<ThreadFactory> threadFactory
346 = std::shared_ptr<ThreadFactory>(new ThreadFactory());
Mark Sleee02385b2007-06-09 01:21:16 +0000347
348 // Dispatcher
cyy316723a2019-01-05 16:35:14 +0800349 std::shared_ptr<Server> serviceHandler(new Server());
Mark Sleee02385b2007-06-09 01:21:16 +0000350
351 if (replayRequests) {
cyy316723a2019-01-05 16:35:14 +0800352 std::shared_ptr<Server> serviceHandler(new Server());
353 std::shared_ptr<ServiceProcessor> serviceProcessor(new ServiceProcessor(serviceHandler));
Mark Sleee02385b2007-06-09 01:21:16 +0000354
355 // Transports
cyy316723a2019-01-05 16:35:14 +0800356 std::shared_ptr<TFileTransport> fileTransport(new TFileTransport(requestLogPath));
Mark Sleee02385b2007-06-09 01:21:16 +0000357 fileTransport->setChunkSize(2 * 1024 * 1024);
358 fileTransport->setMaxEventSize(1024 * 16);
359 fileTransport->seekToEnd();
360
361 // Protocol Factory
cyy316723a2019-01-05 16:35:14 +0800362 std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
Mark Sleee02385b2007-06-09 01:21:16 +0000363
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100364 TFileProcessor fileProcessor(serviceProcessor, protocolFactory, fileTransport);
Mark Sleee02385b2007-06-09 01:21:16 +0000365
366 fileProcessor.process(0, true);
367 exit(0);
368 }
369
Mark Slee3e5d2d72007-06-15 01:45:56 +0000370 if (runServer) {
Mark Sleee02385b2007-06-09 01:21:16 +0000371
cyy316723a2019-01-05 16:35:14 +0800372 std::shared_ptr<ServiceProcessor> serviceProcessor(new ServiceProcessor(serviceHandler));
Mark Sleee02385b2007-06-09 01:21:16 +0000373
374 // Protocol Factory
cyy316723a2019-01-05 16:35:14 +0800375 std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
Mark Sleee02385b2007-06-09 01:21:16 +0000376
377 // Transport Factory
cyy316723a2019-01-05 16:35:14 +0800378 std::shared_ptr<TTransportFactory> transportFactory;
Mark Sleee02385b2007-06-09 01:21:16 +0000379
380 if (logRequests) {
381 // initialize the log file
cyy316723a2019-01-05 16:35:14 +0800382 std::shared_ptr<TFileTransport> fileTransport(new TFileTransport(requestLogPath));
Mark Sleee02385b2007-06-09 01:21:16 +0000383 fileTransport->setChunkSize(2 * 1024 * 1024);
384 fileTransport->setMaxEventSize(1024 * 16);
385
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100386 transportFactory
cyy316723a2019-01-05 16:35:14 +0800387 = std::shared_ptr<TTransportFactory>(new TPipedTransportFactory(fileTransport));
Mark Sleee02385b2007-06-09 01:21:16 +0000388 }
389
cyy316723a2019-01-05 16:35:14 +0800390 std::shared_ptr<Thread> serverThread;
391 std::shared_ptr<Thread> serverThread2;
392 std::shared_ptr<transport::TNonblockingServerSocket> nbSocket1;
393 std::shared_ptr<transport::TNonblockingServerSocket> nbSocket2;
Mark Sleee02385b2007-06-09 01:21:16 +0000394
Mark Slee3e5d2d72007-06-15 01:45:56 +0000395 if (serverType == "simple") {
James E. King, III82ae9572017-08-05 12:23:54 -0400396
Divya Thaluru808d1432017-08-06 16:36:36 -0700397 nbSocket1.reset(new transport::TNonblockingServerSocket(port));
cyy316723a2019-01-05 16:35:14 +0800398 serverThread = threadFactory->newThread(std::shared_ptr<TServer>(
Divya Thaluru808d1432017-08-06 16:36:36 -0700399 new TNonblockingServer(serviceProcessor, protocolFactory, nbSocket1)));
400 nbSocket2.reset(new transport::TNonblockingServerSocket(port + 1));
cyy316723a2019-01-05 16:35:14 +0800401 serverThread2 = threadFactory->newThread(std::shared_ptr<TServer>(
Divya Thaluru808d1432017-08-06 16:36:36 -0700402 new TNonblockingServer(serviceProcessor, protocolFactory, nbSocket2)));
Mark Sleee02385b2007-06-09 01:21:16 +0000403
Mark Slee3e5d2d72007-06-15 01:45:56 +0000404 } else if (serverType == "thread-pool") {
Mark Sleee02385b2007-06-09 01:21:16 +0000405
cyy316723a2019-01-05 16:35:14 +0800406 std::shared_ptr<ThreadManager> threadManager
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100407 = ThreadManager::newSimpleThreadManager(workerCount);
Mark Sleee02385b2007-06-09 01:21:16 +0000408
409 threadManager->threadFactory(threadFactory);
410 threadManager->start();
Divya Thaluru808d1432017-08-06 16:36:36 -0700411 nbSocket1.reset(new transport::TNonblockingServerSocket(port));
cyy316723a2019-01-05 16:35:14 +0800412 serverThread = threadFactory->newThread(std::shared_ptr<TServer>(
Divya Thaluru808d1432017-08-06 16:36:36 -0700413 new TNonblockingServer(serviceProcessor, protocolFactory, nbSocket1, threadManager)));
414 nbSocket2.reset(new transport::TNonblockingServerSocket(port + 1));
cyy316723a2019-01-05 16:35:14 +0800415 serverThread2 = threadFactory->newThread(std::shared_ptr<TServer>(
Divya Thaluru808d1432017-08-06 16:36:36 -0700416 new TNonblockingServer(serviceProcessor, protocolFactory, nbSocket2, threadManager)));
Mark Sleee02385b2007-06-09 01:21:16 +0000417 }
418
Mark Slee79b16942007-11-26 19:05:29 +0000419 cerr << "Starting the server on port " << port << " and " << (port + 1) << endl;
Mark Sleee02385b2007-06-09 01:21:16 +0000420 serverThread->start();
Mark Slee79b16942007-11-26 19:05:29 +0000421 serverThread2->start();
Mark Sleee02385b2007-06-09 01:21:16 +0000422
423 // If we aren't running clients, just wait forever for external clients
424
425 if (clientCount == 0) {
426 serverThread->join();
Mark Slee79b16942007-11-26 19:05:29 +0000427 serverThread2->join();
Mark Sleee02385b2007-06-09 01:21:16 +0000428 }
429 }
Jake Farrell5d02b802014-01-07 21:42:01 -0500430 THRIFT_SLEEP_SEC(1);
Mark Sleee02385b2007-06-09 01:21:16 +0000431
432 if (clientCount > 0) {
433
434 Monitor monitor;
435
436 size_t threadCount = 0;
437
cyy316723a2019-01-05 16:35:14 +0800438 set<std::shared_ptr<Thread> > clientThreads;
Mark Sleee02385b2007-06-09 01:21:16 +0000439
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100440 if (callName == "echoVoid") {
441 loopType = T_VOID;
442 } else if (callName == "echoByte") {
443 loopType = T_BYTE;
444 } else if (callName == "echoI32") {
445 loopType = T_I32;
446 } else if (callName == "echoI64") {
447 loopType = T_I64;
448 } else if (callName == "echoString") {
449 loopType = T_STRING;
450 } else {
451 throw invalid_argument("Unknown service call " + callName);
452 }
Mark Sleee02385b2007-06-09 01:21:16 +0000453
Jake Farrell5d02b802014-01-07 21:42:01 -0500454 for (uint32_t ix = 0; ix < clientCount; ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000455
cyy316723a2019-01-05 16:35:14 +0800456 std::shared_ptr<TSocket> socket(new TSocket("127.0.0.1", port + (ix % 2)));
457 std::shared_ptr<TFramedTransport> framedSocket(new TFramedTransport(socket));
458 std::shared_ptr<TProtocol> protocol(new TBinaryProtocol(framedSocket));
459 std::shared_ptr<ServiceClient> serviceClient(new ServiceClient(protocol));
Mark Sleee02385b2007-06-09 01:21:16 +0000460
cyy316723a2019-01-05 16:35:14 +0800461 clientThreads.insert(threadFactory->newThread(std::shared_ptr<ClientThread>(
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100462 new ClientThread(socket, serviceClient, monitor, threadCount, loopCount, loopType))));
Mark Sleee02385b2007-06-09 01:21:16 +0000463 }
464
Sebastian Zenker042580f2019-01-29 15:48:12 +0100465 for (auto thread = clientThreads.begin();
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100466 thread != clientThreads.end();
467 thread++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000468 (*thread)->start();
469 }
470
Roger Meier5f9614c2010-11-21 16:59:05 +0000471 int64_t time00;
472 int64_t time01;
Mark Sleee02385b2007-06-09 01:21:16 +0000473
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100474 {
475 Synchronized s(monitor);
Mark Sleee02385b2007-06-09 01:21:16 +0000476 threadCount = clientCount;
477
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100478 cerr << "Launch " << clientCount << " client threads" << endl;
Mark Sleee02385b2007-06-09 01:21:16 +0000479
cyybfdbd032019-01-12 14:38:28 +0800480 time00 = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
Mark Sleee02385b2007-06-09 01:21:16 +0000481
482 monitor.notifyAll();
483
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100484 while (threadCount > 0) {
Mark Sleee02385b2007-06-09 01:21:16 +0000485 monitor.wait();
486 }
487
cyybfdbd032019-01-12 14:38:28 +0800488 time01 = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
Mark Sleee02385b2007-06-09 01:21:16 +0000489 }
490
Roger Meier5f9614c2010-11-21 16:59:05 +0000491 int64_t firstTime = 9223372036854775807LL;
492 int64_t lastTime = 0;
Mark Sleee02385b2007-06-09 01:21:16 +0000493
494 double averageTime = 0;
Roger Meier5f9614c2010-11-21 16:59:05 +0000495 int64_t minTime = 9223372036854775807LL;
496 int64_t maxTime = 0;
Mark Sleee02385b2007-06-09 01:21:16 +0000497
Sebastian Zenker042580f2019-01-29 15:48:12 +0100498 for (auto ix = clientThreads.begin();
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100499 ix != clientThreads.end();
500 ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000501
cyy316723a2019-01-05 16:35:14 +0800502 std::shared_ptr<ClientThread> client
503 = std::dynamic_pointer_cast<ClientThread>((*ix)->runnable());
Mark Sleee02385b2007-06-09 01:21:16 +0000504
Roger Meier5f9614c2010-11-21 16:59:05 +0000505 int64_t delta = client->_endTime - client->_startTime;
Mark Sleee02385b2007-06-09 01:21:16 +0000506
507 assert(delta > 0);
508
Mark Slee3e5d2d72007-06-15 01:45:56 +0000509 if (client->_startTime < firstTime) {
Mark Sleee02385b2007-06-09 01:21:16 +0000510 firstTime = client->_startTime;
511 }
512
Mark Slee3e5d2d72007-06-15 01:45:56 +0000513 if (client->_endTime > lastTime) {
Mark Sleee02385b2007-06-09 01:21:16 +0000514 lastTime = client->_endTime;
515 }
516
Mark Slee3e5d2d72007-06-15 01:45:56 +0000517 if (delta < minTime) {
Mark Sleee02385b2007-06-09 01:21:16 +0000518 minTime = delta;
519 }
520
Mark Slee3e5d2d72007-06-15 01:45:56 +0000521 if (delta > maxTime) {
Mark Sleee02385b2007-06-09 01:21:16 +0000522 maxTime = delta;
523 }
524
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100525 averageTime += delta;
Mark Sleee02385b2007-06-09 01:21:16 +0000526 }
527
528 averageTime /= clientCount;
529
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100530 cout << "workers :" << workerCount << ", client : " << clientCount << ", loops : " << loopCount
531 << ", rate : " << (clientCount * loopCount * 1000) / ((double)(time01 - time00)) << endl;
Mark Sleee02385b2007-06-09 01:21:16 +0000532
533 count_map count = serviceHandler->getCount();
534 count_map::iterator iter;
535 for (iter = count.begin(); iter != count.end(); ++iter) {
536 printf("%s => %d\n", iter->first, iter->second);
537 }
538 cerr << "done." << endl;
539 }
540
541 return 0;
542}