THRIFT-3247 Generate a C++ thread-safe client
Client: cpp
Patch: Ben Craig <bencraig@apache.org>
diff --git a/test/cpp/src/StressTest.cpp b/test/cpp/src/StressTest.cpp
index fa468a4..9371bce 100644
--- a/test/cpp/src/StressTest.cpp
+++ b/test/cpp/src/StressTest.cpp
@@ -33,7 +33,6 @@
#include <thrift/TLogging.h>
#include "Service.h"
-
#include <iostream>
#include <set>
#include <stdexcept>
@@ -102,20 +101,26 @@
Mutex lock_;
};
+enum TransportOpenCloseBehavior {
+ OpenAndCloseTransportInThread,
+ DontOpenAndCloseTransportInThread
+};
class ClientThread : public Runnable {
public:
ClientThread(boost::shared_ptr<TTransport> transport,
- boost::shared_ptr<ServiceClient> client,
+ boost::shared_ptr<ServiceIf> client,
Monitor& monitor,
size_t& workerCount,
size_t loopCount,
- TType loopType)
+ TType loopType,
+ TransportOpenCloseBehavior behavior)
: _transport(transport),
_client(client),
_monitor(monitor),
_workerCount(workerCount),
_loopCount(loopCount),
- _loopType(loopType) {}
+ _loopType(loopType),
+ _behavior(behavior) {}
void run() {
@@ -129,8 +134,9 @@
}
_startTime = Util::currentTime();
-
- _transport->open();
+ if(_behavior == OpenAndCloseTransportInThread) {
+ _transport->open();
+ }
switch (_loopType) {
case T_VOID:
@@ -155,7 +161,9 @@
_endTime = Util::currentTime();
- _transport->close();
+ if(_behavior == OpenAndCloseTransportInThread) {
+ _transport->close();
+ }
_done = true;
@@ -217,7 +225,7 @@
}
boost::shared_ptr<TTransport> _transport;
- boost::shared_ptr<ServiceClient> _client;
+ boost::shared_ptr<ServiceIf> _client;
Monitor& _monitor;
size_t& _workerCount;
size_t _loopCount;
@@ -226,6 +234,7 @@
int64_t _endTime;
bool _done;
Monitor _sleep;
+ TransportOpenCloseBehavior _behavior;
};
class TStartObserver : public apache::thrift::server::TServerEventHandler {
@@ -253,6 +262,7 @@
#endif
int port = 9091;
+ string clientType = "regular";
string serverType = "thread-pool";
string protocolType = "binary";
size_t workerCount = 4;
@@ -269,23 +279,23 @@
usage << argv[0] << " [--port=<port number>] [--server] [--server-type=<server-type>] "
"[--protocol-type=<protocol-type>] [--workers=<worker-count>] "
- "[--clients=<client-count>] [--loop=<loop-count>]" << endl
+ "[--clients=<client-count>] [--loop=<loop-count>] "
+ "[--client-type=<client-type>]" << endl
<< "\tclients Number of client threads to create - 0 implies no clients, i.e. "
- "server only. Default is " << clientCount << endl
+ "server only. Default is " << clientCount << endl
<< "\thelp Prints this help text." << endl
<< "\tcall Service method to call. Default is " << callName << endl
- << "\tloop The number of remote thrift calls each client makes. Default is "
- << loopCount << endl << "\tport The port the server and clients should bind to "
- "for thrift network connections. Default is " << port << endl
- << "\tserver Run the Thrift server in this process. Default is " << runServer
- << endl << "\tserver-type Type of server, \"simple\" or \"thread-pool\". Default is "
- << serverType << endl
- << "\tprotocol-type Type of protocol, \"binary\", \"ascii\", or \"xml\". Default is "
- << protocolType << endl
- << "\tlog-request Log all request to ./requestlog.tlog. Default is " << logRequests
- << endl << "\treplay-request Replay requests from log file (./requestlog.tlog) Default is "
- << replayRequests << endl << "\tworkers Number of thread pools workers. Only valid "
- "for thread-pool server type. Default is " << workerCount
+ << "\tloop The number of remote thrift calls each client makes. Default is " << loopCount << endl
+ << "\tport The port the server and clients should bind to "
+ "for thrift network connections. Default is " << port << endl
+ << "\tserver Run the Thrift server in this process. Default is " << runServer << endl
+ << "\tserver-type Type of server, \"simple\" or \"thread-pool\". Default is " << serverType << endl
+ << "\tprotocol-type Type of protocol, \"binary\", \"ascii\", or \"xml\". Default is " << protocolType << endl
+ << "\tlog-request Log all request to ./requestlog.tlog. Default is " << logRequests << endl
+ << "\treplay-request Replay requests from log file (./requestlog.tlog) Default is " << replayRequests << endl
+ << "\tworkers Number of thread pools workers. Only valid "
+ "for thread-pool server type. Default is " << workerCount << endl
+ << "\tclient-type Type of client, \"regular\" or \"concurrent\". Default is " << clientType << endl
<< endl;
map<string, string> args;
@@ -359,7 +369,18 @@
throw invalid_argument("Unknown server type " + serverType);
}
}
+ if (!args["client-type"].empty()) {
+ clientType = args["client-type"];
+ if (clientType == "regular") {
+
+ } else if (clientType == "concurrent") {
+
+ } else {
+
+ throw invalid_argument("Unknown client type " + clientType);
+ }
+ }
if (!args["workers"].empty()) {
workerCount = atoi(args["workers"].c_str());
}
@@ -458,7 +479,7 @@
}
}
- if (clientCount > 0) {
+ if (clientCount > 0) { //FIXME: start here for client type?
Monitor monitor;
@@ -480,15 +501,28 @@
throw invalid_argument("Unknown service call " + callName);
}
- for (size_t ix = 0; ix < clientCount; ix++) {
+ if(clientType == "regular") {
+ for (size_t ix = 0; ix < clientCount; ix++) {
+ boost::shared_ptr<TSocket> socket(new TSocket("127.0.0.1", port));
+ boost::shared_ptr<TBufferedTransport> bufferedSocket(new TBufferedTransport(socket, 2048));
+ boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(bufferedSocket));
+ boost::shared_ptr<ServiceClient> serviceClient(new ServiceClient(protocol));
+
+ clientThreads.insert(threadFactory->newThread(boost::shared_ptr<ClientThread>(
+ new ClientThread(socket, serviceClient, monitor, threadCount, loopCount, loopType, OpenAndCloseTransportInThread))));
+ }
+ } else if(clientType == "concurrent") {
boost::shared_ptr<TSocket> socket(new TSocket("127.0.0.1", port));
boost::shared_ptr<TBufferedTransport> bufferedSocket(new TBufferedTransport(socket, 2048));
boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(bufferedSocket));
- boost::shared_ptr<ServiceClient> serviceClient(new ServiceClient(protocol));
-
- clientThreads.insert(threadFactory->newThread(boost::shared_ptr<ClientThread>(
- new ClientThread(socket, serviceClient, monitor, threadCount, loopCount, loopType))));
+ //boost::shared_ptr<ServiceClient> serviceClient(new ServiceClient(protocol));
+ boost::shared_ptr<ServiceConcurrentClient> serviceClient(new ServiceConcurrentClient(protocol));
+ socket->open();
+ for (size_t ix = 0; ix < clientCount; ix++) {
+ clientThreads.insert(threadFactory->newThread(boost::shared_ptr<ClientThread>(
+ new ClientThread(socket, serviceClient, monitor, threadCount, loopCount, loopType, DontOpenAndCloseTransportInThread))));
+ }
}
for (std::set<boost::shared_ptr<Thread> >::const_iterator thread = clientThreads.begin();
@@ -504,7 +538,7 @@
Synchronized s(monitor);
threadCount = clientCount;
- cerr << "Launch " << clientCount << " client threads" << endl;
+ cerr << "Launch " << clientCount << " " << clientType << " client threads" << endl;
time00 = Util::currentTime();