blob: d9d9ae606b50b4a3a3bf43d3486f6f17a4c7b17d [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
Roger Meier49ff8b12012-04-13 09:12:31 +000020#include <thrift/concurrency/ThreadManager.h>
cyyca8af9b2019-01-11 22:13:12 +080021#include <thrift/concurrency/ThreadFactory.h>
Roger Meier49ff8b12012-04-13 09:12:31 +000022#include <thrift/concurrency/Monitor.h>
Roger Meier49ff8b12012-04-13 09:12:31 +000023#include <thrift/concurrency/Mutex.h>
24#include <thrift/protocol/TBinaryProtocol.h>
25#include <thrift/server/TSimpleServer.h>
26#include <thrift/server/TThreadPoolServer.h>
27#include <thrift/server/TThreadedServer.h>
28#include <thrift/server/TNonblockingServer.h>
29#include <thrift/transport/TServerSocket.h>
30#include <thrift/transport/TSocket.h>
Divya Thaluru808d1432017-08-06 16:36:36 -070031#include <thrift/transport/TNonblockingServerSocket.h>
Roger Meier49ff8b12012-04-13 09:12:31 +000032#include <thrift/transport/TTransportUtils.h>
33#include <thrift/transport/TFileTransport.h>
Roger Meier33eaa0f2012-04-13 09:13:13 +000034#include <thrift/TLogging.h>
Mark Sleee02385b2007-06-09 01:21:16 +000035
36#include "Service.h"
37
Mark Sleee02385b2007-06-09 01:21:16 +000038#include <iostream>
39#include <set>
40#include <stdexcept>
41#include <sstream>
Mark Sleee02385b2007-06-09 01:21:16 +000042#include <map>
Jake Farrell5d02b802014-01-07 21:42:01 -050043#if _WIN32
Konrad Grochowski16a23a62014-11-13 15:33:38 +010044#include <thrift/windows/TWinsockSingleton.h>
Jake Farrell5d02b802014-01-07 21:42:01 -050045#endif
Mark Sleee02385b2007-06-09 01:21:16 +000046
47using namespace std;
Mark Sleee02385b2007-06-09 01:21:16 +000048
T Jake Lucianib5e62212009-01-31 22:36:20 +000049using namespace apache::thrift;
50using namespace apache::thrift::protocol;
51using namespace apache::thrift::transport;
52using namespace apache::thrift::server;
53using namespace apache::thrift::concurrency;
Mark Sleee02385b2007-06-09 01:21:16 +000054
55using namespace test::stress;
56
57struct eqstr {
Konrad Grochowski16a23a62014-11-13 15:33:38 +010058 bool operator()(const char* s1, const char* s2) const { return strcmp(s1, s2) == 0; }
Mark Sleee02385b2007-06-09 01:21:16 +000059};
60
61struct ltstr {
Konrad Grochowski16a23a62014-11-13 15:33:38 +010062 bool operator()(const char* s1, const char* s2) const { return strcmp(s1, s2) < 0; }
Mark Sleee02385b2007-06-09 01:21:16 +000063};
64
Mark Sleee02385b2007-06-09 01:21:16 +000065// typedef hash_map<const char*, int, hash<const char*>, eqstr> count_map;
66typedef map<const char*, int, ltstr> count_map;
67
68class Server : public ServiceIf {
Konrad Grochowski16a23a62014-11-13 15:33:38 +010069public:
Sebastian Zenker042580f2019-01-29 15:48:12 +010070 Server() = default;
Mark Sleee02385b2007-06-09 01:21:16 +000071
72 void count(const char* method) {
Mark Slee79b16942007-11-26 19:05:29 +000073 Guard m(lock_);
Mark Sleee02385b2007-06-09 01:21:16 +000074 int ct = counts_[method];
75 counts_[method] = ++ct;
76 }
77
Sebastian Zenker042580f2019-01-29 15:48:12 +010078 void echoVoid() override {
Mark Sleee02385b2007-06-09 01:21:16 +000079 count("echoVoid");
Mark Slee3e5d2d72007-06-15 01:45:56 +000080 // Sleep to simulate work
Jake Farrell5d02b802014-01-07 21:42:01 -050081 THRIFT_SLEEP_USEC(1);
Mark Sleee02385b2007-06-09 01:21:16 +000082 return;
83 }
84
85 count_map getCount() {
Mark Slee79b16942007-11-26 19:05:29 +000086 Guard m(lock_);
Mark Sleee02385b2007-06-09 01:21:16 +000087 return counts_;
88 }
89
Sebastian Zenker042580f2019-01-29 15:48:12 +010090 int8_t echoByte(const int8_t arg) override { return arg; }
91 int32_t echoI32(const int32_t arg) override { return arg; }
92 int64_t echoI64(const int64_t arg) override { return arg; }
93 void echoString(string& out, const string& arg) override {
Mark Sleee02385b2007-06-09 01:21:16 +000094 if (arg != "hello") {
Roger Meiera8cef6e2011-07-17 18:55:59 +000095 T_ERROR_ABORT("WRONG STRING (%s)!!!!", arg.c_str());
Mark Sleee02385b2007-06-09 01:21:16 +000096 }
97 out = arg;
98 }
Sebastian Zenker042580f2019-01-29 15:48:12 +010099 void echoList(vector<int8_t>& out, const vector<int8_t>& arg) override { out = arg; }
100 void echoSet(set<int8_t>& out, const set<int8_t>& arg) override { out = arg; }
101 void echoMap(map<int8_t, int8_t>& out, const map<int8_t, int8_t>& arg) override { out = arg; }
Mark Sleee02385b2007-06-09 01:21:16 +0000102
103private:
104 count_map counts_;
105 Mutex lock_;
Mark Sleee02385b2007-06-09 01:21:16 +0000106};
107
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100108class ClientThread : public Runnable {
Mark Sleee02385b2007-06-09 01:21:16 +0000109public:
cyy316723a2019-01-05 16:35:14 +0800110 ClientThread(std::shared_ptr<TTransport> transport,
111 std::shared_ptr<ServiceClient> client,
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100112 Monitor& monitor,
113 size_t& workerCount,
114 size_t loopCount,
115 TType loopType)
116 : _transport(transport),
117 _client(client),
118 _monitor(monitor),
119 _workerCount(workerCount),
120 _loopCount(loopCount),
121 _loopType(loopType) {}
Mark Sleee02385b2007-06-09 01:21:16 +0000122
Sebastian Zenker042580f2019-01-29 15:48:12 +0100123 void run() override {
Mark Sleee02385b2007-06-09 01:21:16 +0000124
125 // Wait for all worker threads to start
126
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100127 {
128 Synchronized s(_monitor);
129 while (_workerCount == 0) {
130 _monitor.wait();
131 }
Mark Sleee02385b2007-06-09 01:21:16 +0000132 }
133
cyybfdbd032019-01-12 14:38:28 +0800134 _startTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
Mark Sleee02385b2007-06-09 01:21:16 +0000135
136 _transport->open();
137
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100138 switch (_loopType) {
139 case T_VOID:
140 loopEchoVoid();
141 break;
142 case T_BYTE:
143 loopEchoByte();
144 break;
145 case T_I32:
146 loopEchoI32();
147 break;
148 case T_I64:
149 loopEchoI64();
150 break;
151 case T_STRING:
152 loopEchoString();
153 break;
154 default:
CJCombrink4a280d52024-03-14 19:57:41 +0100155 cerr << "Unexpected loop type" << _loopType << '\n';
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100156 break;
Mark Sleee02385b2007-06-09 01:21:16 +0000157 }
158
cyybfdbd032019-01-12 14:38:28 +0800159 _endTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
Mark Sleee02385b2007-06-09 01:21:16 +0000160
161 _transport->close();
162
163 _done = true;
164
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100165 {
166 Synchronized s(_monitor);
Mark Sleee02385b2007-06-09 01:21:16 +0000167
168 _workerCount--;
169
Mark Slee3e5d2d72007-06-15 01:45:56 +0000170 if (_workerCount == 0) {
Mark Sleee02385b2007-06-09 01:21:16 +0000171
172 _monitor.notify();
173 }
174 }
175 }
176
177 void loopEchoVoid() {
Mark Slee3e5d2d72007-06-15 01:45:56 +0000178 for (size_t ix = 0; ix < _loopCount; ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000179 _client->echoVoid();
180 }
181 }
182
183 void loopEchoByte() {
Mark Slee3e5d2d72007-06-15 01:45:56 +0000184 for (size_t ix = 0; ix < _loopCount; ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000185 int8_t arg = 1;
186 int8_t result;
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100187 result = _client->echoByte(arg);
Jake Farrell5d02b802014-01-07 21:42:01 -0500188 (void)result;
Mark Sleee02385b2007-06-09 01:21:16 +0000189 assert(result == arg);
190 }
191 }
192
193 void loopEchoI32() {
Mark Slee3e5d2d72007-06-15 01:45:56 +0000194 for (size_t ix = 0; ix < _loopCount; ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000195 int32_t arg = 1;
196 int32_t result;
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100197 result = _client->echoI32(arg);
Jake Farrell5d02b802014-01-07 21:42:01 -0500198 (void)result;
Mark Sleee02385b2007-06-09 01:21:16 +0000199 assert(result == arg);
200 }
201 }
202
203 void loopEchoI64() {
Mark Slee3e5d2d72007-06-15 01:45:56 +0000204 for (size_t ix = 0; ix < _loopCount; ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000205 int64_t arg = 1;
206 int64_t result;
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100207 result = _client->echoI64(arg);
Jake Farrell5d02b802014-01-07 21:42:01 -0500208 (void)result;
Mark Sleee02385b2007-06-09 01:21:16 +0000209 assert(result == arg);
210 }
211 }
212
213 void loopEchoString() {
Mark Slee3e5d2d72007-06-15 01:45:56 +0000214 for (size_t ix = 0; ix < _loopCount; ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000215 string arg = "hello";
216 string result;
217 _client->echoString(result, arg);
218 assert(result == arg);
219 }
220 }
221
cyy316723a2019-01-05 16:35:14 +0800222 std::shared_ptr<TTransport> _transport;
223 std::shared_ptr<ServiceClient> _client;
Mark Sleee02385b2007-06-09 01:21:16 +0000224 Monitor& _monitor;
225 size_t& _workerCount;
226 size_t _loopCount;
227 TType _loopType;
Roger Meier5f9614c2010-11-21 16:59:05 +0000228 int64_t _startTime;
229 int64_t _endTime;
Mark Sleee02385b2007-06-09 01:21:16 +0000230 bool _done;
231 Monitor _sleep;
232};
233
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100234int main(int argc, char** argv) {
Jake Farrell5d02b802014-01-07 21:42:01 -0500235#if _WIN32
236 transport::TWinsockSingleton::create();
237#endif
Mark Sleee02385b2007-06-09 01:21:16 +0000238
239 int port = 9091;
240 string serverType = "simple";
241 string protocolType = "binary";
Jake Farrell5d02b802014-01-07 21:42:01 -0500242 uint32_t workerCount = 4;
243 uint32_t clientCount = 20;
244 uint32_t loopCount = 1000;
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100245 TType loopType = T_VOID;
Mark Sleee02385b2007-06-09 01:21:16 +0000246 string callName = "echoVoid";
247 bool runServer = true;
248 bool logRequests = false;
249 string requestLogPath = "./requestlog.tlog";
250 bool replayRequests = false;
251
252 ostringstream usage;
253
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100254 usage << argv[0] << " [--port=<port number>] [--server] [--server-type=<server-type>] "
255 "[--protocol-type=<protocol-type>] [--workers=<worker-count>] "
CJCombrink4a280d52024-03-14 19:57:41 +0100256 "[--clients=<client-count>] [--loop=<loop-count>]" << '\n'
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100257 << "\tclients Number of client threads to create - 0 implies no clients, i.e. "
CJCombrink4a280d52024-03-14 19:57:41 +0100258 "server only. Default is " << clientCount << '\n'
259 << "\thelp Prints this help text." << '\n'
260 << "\tcall Service method to call. Default is " << callName << '\n'
261 << "\tloop The number of remote thrift calls each client makes. Default is " << loopCount << '\n'
262 << "\tport The port the server and clients should bind to for thrift network "
263 "connections. Default is " << port << '\n'
264 << "\tserver Run the Thrift server in this process. Default is " << runServer << '\n'
265 << "\tserver-type Type of server, \"simple\" or \"thread-pool\". Default is " << serverType << '\n'
266 << "\tprotocol-type Type of protocol, \"binary\", \"ascii\", or \"xml\". Default is " << protocolType << '\n'
267 << "\tlog-request Log all request to ./requestlog.tlog. Default is " << logRequests << '\n'
268 << "\treplay-request Replay requests from log file (./requestlog.tlog) Default is " << replayRequests << '\n'
269 << "\tworkers Number of thread pools workers. Only valid for thread-pool server type. Default is " << workerCount << '\n';
Mark Sleee02385b2007-06-09 01:21:16 +0000270
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100271 map<string, string> args;
Mark Sleee02385b2007-06-09 01:21:16 +0000272
Mark Slee3e5d2d72007-06-15 01:45:56 +0000273 for (int ix = 1; ix < argc; ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000274
275 string arg(argv[ix]);
276
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100277 if (arg.compare(0, 2, "--") == 0) {
Mark Sleee02385b2007-06-09 01:21:16 +0000278
279 size_t end = arg.find_first_of("=", 2);
280
281 string key = string(arg, 2, end - 2);
282
Mark Slee3e5d2d72007-06-15 01:45:56 +0000283 if (end != string::npos) {
Mark Sleee02385b2007-06-09 01:21:16 +0000284 args[key] = string(arg, end + 1);
285 } else {
286 args[key] = "true";
287 }
288 } else {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100289 throw invalid_argument("Unexcepted command line token: " + arg);
Mark Sleee02385b2007-06-09 01:21:16 +0000290 }
291 }
292
293 try {
294
Mark Slee3e5d2d72007-06-15 01:45:56 +0000295 if (!args["clients"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000296 clientCount = atoi(args["clients"].c_str());
297 }
298
Mark Slee3e5d2d72007-06-15 01:45:56 +0000299 if (!args["help"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000300 cerr << usage.str();
301 return 0;
302 }
303
Mark Slee3e5d2d72007-06-15 01:45:56 +0000304 if (!args["loop"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000305 loopCount = atoi(args["loop"].c_str());
306 }
307
Mark Slee3e5d2d72007-06-15 01:45:56 +0000308 if (!args["call"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000309 callName = args["call"];
310 }
311
Mark Slee3e5d2d72007-06-15 01:45:56 +0000312 if (!args["port"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000313 port = atoi(args["port"].c_str());
314 }
315
Mark Slee3e5d2d72007-06-15 01:45:56 +0000316 if (!args["server"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000317 runServer = args["server"] == "true";
318 }
319
Mark Slee3e5d2d72007-06-15 01:45:56 +0000320 if (!args["log-request"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000321 logRequests = args["log-request"] == "true";
322 }
323
Mark Slee3e5d2d72007-06-15 01:45:56 +0000324 if (!args["replay-request"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000325 replayRequests = args["replay-request"] == "true";
326 }
327
Mark Slee3e5d2d72007-06-15 01:45:56 +0000328 if (!args["server-type"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000329 serverType = args["server-type"];
330 }
331
Mark Slee3e5d2d72007-06-15 01:45:56 +0000332 if (!args["workers"].empty()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000333 workerCount = atoi(args["workers"].c_str());
334 }
335
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100336 } catch (std::exception& e) {
CJCombrink4a280d52024-03-14 19:57:41 +0100337 cerr << e.what() << '\n';
Jake Farrell5d02b802014-01-07 21:42:01 -0500338 cerr << usage.str();
Mark Sleee02385b2007-06-09 01:21:16 +0000339 }
340
cyyca8af9b2019-01-11 22:13:12 +0800341 std::shared_ptr<ThreadFactory> threadFactory
342 = std::shared_ptr<ThreadFactory>(new ThreadFactory());
Mark Sleee02385b2007-06-09 01:21:16 +0000343
344 // Dispatcher
cyy316723a2019-01-05 16:35:14 +0800345 std::shared_ptr<Server> serviceHandler(new Server());
Mark Sleee02385b2007-06-09 01:21:16 +0000346
347 if (replayRequests) {
cyy316723a2019-01-05 16:35:14 +0800348 std::shared_ptr<Server> serviceHandler(new Server());
349 std::shared_ptr<ServiceProcessor> serviceProcessor(new ServiceProcessor(serviceHandler));
Mark Sleee02385b2007-06-09 01:21:16 +0000350
351 // Transports
cyy316723a2019-01-05 16:35:14 +0800352 std::shared_ptr<TFileTransport> fileTransport(new TFileTransport(requestLogPath));
Mark Sleee02385b2007-06-09 01:21:16 +0000353 fileTransport->setChunkSize(2 * 1024 * 1024);
354 fileTransport->setMaxEventSize(1024 * 16);
355 fileTransport->seekToEnd();
356
357 // Protocol Factory
cyy316723a2019-01-05 16:35:14 +0800358 std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
Mark Sleee02385b2007-06-09 01:21:16 +0000359
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100360 TFileProcessor fileProcessor(serviceProcessor, protocolFactory, fileTransport);
Mark Sleee02385b2007-06-09 01:21:16 +0000361
362 fileProcessor.process(0, true);
363 exit(0);
364 }
365
Mark Slee3e5d2d72007-06-15 01:45:56 +0000366 if (runServer) {
Mark Sleee02385b2007-06-09 01:21:16 +0000367
cyy316723a2019-01-05 16:35:14 +0800368 std::shared_ptr<ServiceProcessor> serviceProcessor(new ServiceProcessor(serviceHandler));
Mark Sleee02385b2007-06-09 01:21:16 +0000369
370 // Protocol Factory
cyy316723a2019-01-05 16:35:14 +0800371 std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
Mark Sleee02385b2007-06-09 01:21:16 +0000372
373 // Transport Factory
cyy316723a2019-01-05 16:35:14 +0800374 std::shared_ptr<TTransportFactory> transportFactory;
Mark Sleee02385b2007-06-09 01:21:16 +0000375
376 if (logRequests) {
377 // initialize the log file
cyy316723a2019-01-05 16:35:14 +0800378 std::shared_ptr<TFileTransport> fileTransport(new TFileTransport(requestLogPath));
Mark Sleee02385b2007-06-09 01:21:16 +0000379 fileTransport->setChunkSize(2 * 1024 * 1024);
380 fileTransport->setMaxEventSize(1024 * 16);
381
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100382 transportFactory
cyy316723a2019-01-05 16:35:14 +0800383 = std::shared_ptr<TTransportFactory>(new TPipedTransportFactory(fileTransport));
Mark Sleee02385b2007-06-09 01:21:16 +0000384 }
385
cyy316723a2019-01-05 16:35:14 +0800386 std::shared_ptr<Thread> serverThread;
387 std::shared_ptr<Thread> serverThread2;
388 std::shared_ptr<transport::TNonblockingServerSocket> nbSocket1;
389 std::shared_ptr<transport::TNonblockingServerSocket> nbSocket2;
Mark Sleee02385b2007-06-09 01:21:16 +0000390
Mark Slee3e5d2d72007-06-15 01:45:56 +0000391 if (serverType == "simple") {
James E. King, III82ae9572017-08-05 12:23:54 -0400392
Divya Thaluru808d1432017-08-06 16:36:36 -0700393 nbSocket1.reset(new transport::TNonblockingServerSocket(port));
cyy316723a2019-01-05 16:35:14 +0800394 serverThread = threadFactory->newThread(std::shared_ptr<TServer>(
Divya Thaluru808d1432017-08-06 16:36:36 -0700395 new TNonblockingServer(serviceProcessor, protocolFactory, nbSocket1)));
396 nbSocket2.reset(new transport::TNonblockingServerSocket(port + 1));
cyy316723a2019-01-05 16:35:14 +0800397 serverThread2 = threadFactory->newThread(std::shared_ptr<TServer>(
Divya Thaluru808d1432017-08-06 16:36:36 -0700398 new TNonblockingServer(serviceProcessor, protocolFactory, nbSocket2)));
Mark Sleee02385b2007-06-09 01:21:16 +0000399
Mark Slee3e5d2d72007-06-15 01:45:56 +0000400 } else if (serverType == "thread-pool") {
Mark Sleee02385b2007-06-09 01:21:16 +0000401
cyy316723a2019-01-05 16:35:14 +0800402 std::shared_ptr<ThreadManager> threadManager
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100403 = ThreadManager::newSimpleThreadManager(workerCount);
Mark Sleee02385b2007-06-09 01:21:16 +0000404
405 threadManager->threadFactory(threadFactory);
406 threadManager->start();
Divya Thaluru808d1432017-08-06 16:36:36 -0700407 nbSocket1.reset(new transport::TNonblockingServerSocket(port));
cyy316723a2019-01-05 16:35:14 +0800408 serverThread = threadFactory->newThread(std::shared_ptr<TServer>(
Divya Thaluru808d1432017-08-06 16:36:36 -0700409 new TNonblockingServer(serviceProcessor, protocolFactory, nbSocket1, threadManager)));
410 nbSocket2.reset(new transport::TNonblockingServerSocket(port + 1));
cyy316723a2019-01-05 16:35:14 +0800411 serverThread2 = threadFactory->newThread(std::shared_ptr<TServer>(
Divya Thaluru808d1432017-08-06 16:36:36 -0700412 new TNonblockingServer(serviceProcessor, protocolFactory, nbSocket2, threadManager)));
Mark Sleee02385b2007-06-09 01:21:16 +0000413 }
414
CJCombrink4a280d52024-03-14 19:57:41 +0100415 cerr << "Starting the server on port " << port << " and " << (port + 1) << '\n';
Mark Sleee02385b2007-06-09 01:21:16 +0000416 serverThread->start();
Mark Slee79b16942007-11-26 19:05:29 +0000417 serverThread2->start();
Mark Sleee02385b2007-06-09 01:21:16 +0000418
419 // If we aren't running clients, just wait forever for external clients
420
421 if (clientCount == 0) {
422 serverThread->join();
Mark Slee79b16942007-11-26 19:05:29 +0000423 serverThread2->join();
Mark Sleee02385b2007-06-09 01:21:16 +0000424 }
425 }
Jake Farrell5d02b802014-01-07 21:42:01 -0500426 THRIFT_SLEEP_SEC(1);
Mark Sleee02385b2007-06-09 01:21:16 +0000427
428 if (clientCount > 0) {
429
430 Monitor monitor;
431
432 size_t threadCount = 0;
433
cyy316723a2019-01-05 16:35:14 +0800434 set<std::shared_ptr<Thread> > clientThreads;
Mark Sleee02385b2007-06-09 01:21:16 +0000435
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100436 if (callName == "echoVoid") {
437 loopType = T_VOID;
438 } else if (callName == "echoByte") {
439 loopType = T_BYTE;
440 } else if (callName == "echoI32") {
441 loopType = T_I32;
442 } else if (callName == "echoI64") {
443 loopType = T_I64;
444 } else if (callName == "echoString") {
445 loopType = T_STRING;
446 } else {
447 throw invalid_argument("Unknown service call " + callName);
448 }
Mark Sleee02385b2007-06-09 01:21:16 +0000449
Jake Farrell5d02b802014-01-07 21:42:01 -0500450 for (uint32_t ix = 0; ix < clientCount; ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000451
cyy316723a2019-01-05 16:35:14 +0800452 std::shared_ptr<TSocket> socket(new TSocket("127.0.0.1", port + (ix % 2)));
453 std::shared_ptr<TFramedTransport> framedSocket(new TFramedTransport(socket));
454 std::shared_ptr<TProtocol> protocol(new TBinaryProtocol(framedSocket));
455 std::shared_ptr<ServiceClient> serviceClient(new ServiceClient(protocol));
Mark Sleee02385b2007-06-09 01:21:16 +0000456
cyy316723a2019-01-05 16:35:14 +0800457 clientThreads.insert(threadFactory->newThread(std::shared_ptr<ClientThread>(
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100458 new ClientThread(socket, serviceClient, monitor, threadCount, loopCount, loopType))));
Mark Sleee02385b2007-06-09 01:21:16 +0000459 }
460
Sebastian Zenker042580f2019-01-29 15:48:12 +0100461 for (auto thread = clientThreads.begin();
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100462 thread != clientThreads.end();
463 thread++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000464 (*thread)->start();
465 }
466
Roger Meier5f9614c2010-11-21 16:59:05 +0000467 int64_t time00;
468 int64_t time01;
Mark Sleee02385b2007-06-09 01:21:16 +0000469
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100470 {
471 Synchronized s(monitor);
Mark Sleee02385b2007-06-09 01:21:16 +0000472 threadCount = clientCount;
473
CJCombrink4a280d52024-03-14 19:57:41 +0100474 cerr << "Launch " << clientCount << " client threads" << '\n';
Mark Sleee02385b2007-06-09 01:21:16 +0000475
cyybfdbd032019-01-12 14:38:28 +0800476 time00 = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
Mark Sleee02385b2007-06-09 01:21:16 +0000477
478 monitor.notifyAll();
479
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100480 while (threadCount > 0) {
Mark Sleee02385b2007-06-09 01:21:16 +0000481 monitor.wait();
482 }
483
cyybfdbd032019-01-12 14:38:28 +0800484 time01 = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
Mark Sleee02385b2007-06-09 01:21:16 +0000485 }
486
Roger Meier5f9614c2010-11-21 16:59:05 +0000487 int64_t firstTime = 9223372036854775807LL;
488 int64_t lastTime = 0;
Mark Sleee02385b2007-06-09 01:21:16 +0000489
490 double averageTime = 0;
Roger Meier5f9614c2010-11-21 16:59:05 +0000491 int64_t minTime = 9223372036854775807LL;
492 int64_t maxTime = 0;
Mark Sleee02385b2007-06-09 01:21:16 +0000493
Sebastian Zenker042580f2019-01-29 15:48:12 +0100494 for (auto ix = clientThreads.begin();
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100495 ix != clientThreads.end();
496 ix++) {
Mark Sleee02385b2007-06-09 01:21:16 +0000497
cyy316723a2019-01-05 16:35:14 +0800498 std::shared_ptr<ClientThread> client
499 = std::dynamic_pointer_cast<ClientThread>((*ix)->runnable());
Mark Sleee02385b2007-06-09 01:21:16 +0000500
Roger Meier5f9614c2010-11-21 16:59:05 +0000501 int64_t delta = client->_endTime - client->_startTime;
Mark Sleee02385b2007-06-09 01:21:16 +0000502
503 assert(delta > 0);
504
Mark Slee3e5d2d72007-06-15 01:45:56 +0000505 if (client->_startTime < firstTime) {
Mark Sleee02385b2007-06-09 01:21:16 +0000506 firstTime = client->_startTime;
507 }
508
Mark Slee3e5d2d72007-06-15 01:45:56 +0000509 if (client->_endTime > lastTime) {
Mark Sleee02385b2007-06-09 01:21:16 +0000510 lastTime = client->_endTime;
511 }
512
Mark Slee3e5d2d72007-06-15 01:45:56 +0000513 if (delta < minTime) {
Mark Sleee02385b2007-06-09 01:21:16 +0000514 minTime = delta;
515 }
516
Mark Slee3e5d2d72007-06-15 01:45:56 +0000517 if (delta > maxTime) {
Mark Sleee02385b2007-06-09 01:21:16 +0000518 maxTime = delta;
519 }
520
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100521 averageTime += delta;
Mark Sleee02385b2007-06-09 01:21:16 +0000522 }
523
524 averageTime /= clientCount;
525
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100526 cout << "workers :" << workerCount << ", client : " << clientCount << ", loops : " << loopCount
CJCombrink4a280d52024-03-14 19:57:41 +0100527 << ", rate : " << (clientCount * loopCount * 1000) / ((double)(time01 - time00)) << '\n';
Mark Sleee02385b2007-06-09 01:21:16 +0000528
529 count_map count = serviceHandler->getCount();
530 count_map::iterator iter;
531 for (iter = count.begin(); iter != count.end(); ++iter) {
532 printf("%s => %d\n", iter->first, iter->second);
533 }
CJCombrink4a280d52024-03-14 19:57:41 +0100534 cerr << "done." << '\n';
Mark Sleee02385b2007-06-09 01:21:16 +0000535 }
536
537 return 0;
538}