THRIFT-4384: fix concurrent sync in cpp async client code
diff --git a/build/cmake/DefineOptions.cmake b/build/cmake/DefineOptions.cmake
index eea0b29..778be8d 100644
--- a/build/cmake/DefineOptions.cmake
+++ b/build/cmake/DefineOptions.cmake
@@ -146,24 +146,34 @@
message(STATUS "----------------------------------------------------------")
message(STATUS "Thrift version: ${thrift_VERSION} (${thrift_VERSION_MAJOR}.${thrift_VERSION_MINOR}.${thrift_VERSION_PATCH})")
message(STATUS "Thrift package version: ${PACKAGE_VERSION}")
-message(STATUS "Build configuration Summary")
+message(STATUS)
+message(STATUS "Build configuration summary")
message(STATUS " Build compiler: ${BUILD_COMPILER}")
message(STATUS " Build libraries: ${BUILD_LIBRARIES}")
message(STATUS " Build tests: ${BUILD_TESTING}")
MESSAGE_DEP(HAVE_COMPILER "Disabled because BUILD_THRIFT=OFF and no valid THRIFT_COMPILER is given")
-if (UNIX)
- message(STATUS " Build type: ${CMAKE_BUILD_TYPE}")
-endif ()
-message(STATUS " Language libraries:")
+message(STATUS " Build type: ${CMAKE_BUILD_TYPE}")
+message(STATUS)
+message(STATUS "Language libraries:")
+message(STATUS)
message(STATUS " Build as3 library: ${BUILD_AS3}")
MESSAGE_DEP(WITH_AS3 "Disabled by WITH_AS3=OFF")
MESSAGE_DEP(HAVE_COMPC "Adobe Flex compc was not found - did you set env var FLEX_HOME?")
+message(STATUS)
message(STATUS " Build C++ library: ${BUILD_CPP}")
MESSAGE_DEP(WITH_CPP "Disabled by WITH_CPP=OFF")
-message(STATUS " C++ Language Level: ${CXX_LANGUAGE_LEVEL}")
+if (BUILD_CPP)
+ message(STATUS " C++ Language Level: ${CXX_LANGUAGE_LEVEL}")
+ message(STATUS " Build shared libraries: ${BUILD_SHARED_LIBS}")
+ message(STATUS " Build with libevent support: ${WITH_LIBEVENT}")
+ message(STATUS " Build with Qt5 support: ${WITH_QT5}")
+ message(STATUS " Build with ZLIB support: ${WITH_ZLIB}")
+endif ()
+message(STATUS)
message(STATUS " Build C (GLib) library: ${BUILD_C_GLIB}")
MESSAGE_DEP(WITH_C_GLIB "Disabled by WITH_C_GLIB=OFF")
MESSAGE_DEP(GLIB_FOUND "GLib missing")
+message(STATUS)
message(STATUS " Build Java library: ${BUILD_JAVA}")
MESSAGE_DEP(WITH_JAVA "Disabled by WITH_JAVA=OFF")
if(ANDROID)
@@ -172,19 +182,15 @@
MESSAGE_DEP(JAVA_FOUND "Java Runtime missing")
MESSAGE_DEP(GRADLEW_FOUND "Gradle Wrapper missing")
endif()
+message(STATUS)
message(STATUS " Build Python library: ${BUILD_PYTHON}")
MESSAGE_DEP(WITH_PYTHON "Disabled by WITH_PYTHON=OFF")
MESSAGE_DEP(PYTHONLIBS_FOUND "Python libraries missing")
+message(STATUS)
message(STATUS " Build Haskell library: ${BUILD_HASKELL}")
MESSAGE_DEP(WITH_HASKELL "Disabled by WITH_HASKELL=OFF")
MESSAGE_DEP(GHC_FOUND "GHC missing")
MESSAGE_DEP(CABAL_FOUND "Cabal missing")
-if (BUILD_CPP)
- message(STATUS " Library features:")
- message(STATUS " Build shared libraries: ${BUILD_SHARED_LIBS}")
- message(STATUS " Build with libevent support: ${WITH_LIBEVENT}")
- message(STATUS " Build with Qt5 support: ${WITH_QT5}")
- message(STATUS " Build with ZLIB support: ${WITH_ZLIB}")
-endif ()
+message(STATUS)
message(STATUS "----------------------------------------------------------")
endmacro(PRINT_CONFIG_SUMMARY)
diff --git a/compiler/cpp/src/thrift/generate/t_cpp_generator.cc b/compiler/cpp/src/thrift/generate/t_cpp_generator.cc
index cf30363..617ba7e 100644
--- a/compiler/cpp/src/thrift/generate/t_cpp_generator.cc
+++ b/compiler/cpp/src/thrift/generate/t_cpp_generator.cc
@@ -1720,7 +1720,6 @@
if (gen_cob_style_) {
f_header_ << "#include <thrift/transport/TBufferTransports.h>" << endl // TMemoryBuffer
<< "#include <functional>" << endl
- << "#include <memory>" << endl
<< "namespace apache { namespace thrift { namespace async {" << endl
<< "class TAsyncChannel;" << endl << "}}}" << endl;
}
@@ -1729,6 +1728,7 @@
f_header_ << "#include <thrift/async/TAsyncDispatchProcessor.h>" << endl;
}
f_header_ << "#include <thrift/async/TConcurrentClientSyncInfo.h>" << endl;
+ f_header_ << "#include <memory>" << endl;
f_header_ << "#include \"" << get_include_prefix(*get_program()) << program_name_ << "_types.h\""
<< endl;
@@ -2284,27 +2284,49 @@
indent_up();
if (style != "Cob") {
f_header_ << indent() << service_name_ << style << "Client" << short_suffix << "(" << prot_ptr
- << " prot) ";
+ << " prot";
+ if (style == "Concurrent") {
+ f_header_ << ", std::shared_ptr<::apache::thrift::async::TConcurrentClientSyncInfo> sync";
+ }
+ f_header_ << ") ";
if (extends.empty()) {
+ if (style == "Concurrent") {
+ f_header_ << ": sync_(sync)" << endl;
+ }
f_header_ << "{" << endl;
f_header_ << indent() << " setProtocol" << short_suffix << "(prot);" << endl << indent()
<< "}" << endl;
} else {
f_header_ << ":" << endl;
- f_header_ << indent() << " " << extends << style << client_suffix << "(prot, prot) {}"
- << endl;
+ f_header_ << indent() << " " << extends << style << client_suffix << "(prot, prot";
+ if (style == "Concurrent") {
+ f_header_ << ", sync";
+ }
+ f_header_ << ") {}" << endl;
}
f_header_ << indent() << service_name_ << style << "Client" << short_suffix << "(" << prot_ptr
- << " iprot, " << prot_ptr << " oprot) ";
+ << " iprot, " << prot_ptr << " oprot";
+ if (style == "Concurrent") {
+ f_header_ << ", std::shared_ptr<::apache::thrift::async::TConcurrentClientSyncInfo> sync";
+ }
+ f_header_ << ") ";
+
if (extends.empty()) {
+ if (style == "Concurrent") {
+ f_header_ << ": sync_(sync)" << endl;
+ }
f_header_ << "{" << endl;
f_header_ << indent() << " setProtocol" << short_suffix << "(iprot,oprot);" << endl
<< indent() << "}" << endl;
} else {
f_header_ << ":" << indent() << " " << extends << style << client_suffix
- << "(iprot, oprot) {}" << endl;
+ << "(iprot, oprot";
+ if (style == "Concurrent") {
+ f_header_ << ", sync";
+ }
+ f_header_ << ") {}" << endl;
}
// create the setProtocol methods
@@ -2443,7 +2465,7 @@
if (style == "Concurrent") {
f_header_ <<
- indent() << "::apache::thrift::async::TConcurrentClientSyncInfo sync_;"<<endl;
+ indent() << "std::shared_ptr<::apache::thrift::async::TConcurrentClientSyncInfo> sync_;"<<endl;
}
indent_down();
}
@@ -2549,7 +2571,7 @@
string cseqidVal = "0";
if (style == "Concurrent") {
if (!(*f_iter)->is_oneway()) {
- cseqidVal = "this->sync_.generateSeqId()";
+ cseqidVal = "this->sync_->generateSeqId()";
}
}
// Serialize the request
@@ -2557,7 +2579,7 @@
indent() << "int32_t cseqid = " << cseqidVal << ";" << endl;
if(style == "Concurrent") {
out <<
- indent() << "::apache::thrift::async::TConcurrentSendSentry sentry(&this->sync_);" << endl;
+ indent() << "::apache::thrift::async::TConcurrentSendSentry sentry(this->sync_.get());" << endl;
}
if (style == "Cob") {
out <<
@@ -2622,7 +2644,7 @@
endl <<
indent() << "// the read mutex gets dropped and reacquired as part of waitForWork()" << endl <<
indent() << "// The destructor of this sentry wakes up other clients" << endl <<
- indent() << "::apache::thrift::async::TConcurrentRecvSentry sentry(&this->sync_, seqid);" << endl;
+ indent() << "::apache::thrift::async::TConcurrentRecvSentry sentry(this->sync_.get(), seqid);" << endl;
}
if (style == "Cob" && !gen_no_client_completion_) {
out << indent() << "bool completed = false;" << endl << endl << indent() << "try {";
@@ -2632,7 +2654,7 @@
if (style == "Concurrent") {
out <<
indent() << "while(true) {" << endl <<
- indent() << " if(!this->sync_.getPending(fname, mtype, rseqid)) {" << endl;
+ indent() << " if(!this->sync_->getPending(fname, mtype, rseqid)) {" << endl;
indent_up();
indent_up();
}
@@ -2776,10 +2798,10 @@
out <<
indent() << " }" << endl <<
indent() << " // seqid != rseqid" << endl <<
- indent() << " this->sync_.updatePending(fname, mtype, rseqid);" << endl <<
+ indent() << " this->sync_->updatePending(fname, mtype, rseqid);" << endl <<
endl <<
indent() << " // this will temporarily unlock the readMutex, and let other clients get work done" << endl <<
- indent() << " this->sync_.waitForWork(seqid);" << endl <<
+ indent() << " this->sync_->waitForWork(seqid);" << endl <<
indent() << "} // end while(true)" << endl;
}
if (style == "Cob" && !gen_no_client_completion_) {
diff --git a/test/cpp/CMakeLists.txt b/test/cpp/CMakeLists.txt
index 0c0bd2f..90af782 100755
--- a/test/cpp/CMakeLists.txt
+++ b/test/cpp/CMakeLists.txt
@@ -75,6 +75,7 @@
LINK_AGAINST_THRIFT_LIBRARY(StressTest thrift)
LINK_AGAINST_THRIFT_LIBRARY(StressTest thriftnb)
add_test(NAME StressTest COMMAND StressTest)
+add_test(NAME StressTestConcurrent COMMAND StressTest --client-type=concurrent)
add_executable(StressTestNonBlocking src/StressTestNonBlocking.cpp)
target_link_libraries(StressTestNonBlocking crossstressgencpp ${Boost_LIBRARIES} ${LIBEVENT_LIB})
diff --git a/test/cpp/src/StressTest.cpp b/test/cpp/src/StressTest.cpp
index 1683125..79a708e 100644
--- a/test/cpp/src/StressTest.cpp
+++ b/test/cpp/src/StressTest.cpp
@@ -44,6 +44,7 @@
using namespace std;
using namespace apache::thrift;
+using namespace apache::thrift::async;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
using namespace apache::thrift::server;
@@ -264,8 +265,8 @@
string clientType = "regular";
string serverType = "thread-pool";
string protocolType = "binary";
- size_t workerCount = 4;
- size_t clientCount = 20;
+ size_t workerCount = 8;
+ size_t clientCount = 4;
size_t loopCount = 50000;
TType loopType = T_VOID;
string callName = "echoVoid";
@@ -515,8 +516,8 @@
std::shared_ptr<TSocket> socket(new TSocket("127.0.0.1", port));
std::shared_ptr<TBufferedTransport> bufferedSocket(new TBufferedTransport(socket, 2048));
std::shared_ptr<TProtocol> protocol(new TBinaryProtocol(bufferedSocket));
- //std::shared_ptr<ServiceClient> serviceClient(new ServiceClient(protocol));
- std::shared_ptr<ServiceConcurrentClient> serviceClient(new ServiceConcurrentClient(protocol));
+ auto sync = std::make_shared<TConcurrentClientSyncInfo>();
+ std::shared_ptr<ServiceConcurrentClient> serviceClient(new ServiceConcurrentClient(protocol, sync));
socket->open();
for (size_t ix = 0; ix < clientCount; ix++) {
clientThreads.insert(threadFactory->newThread(std::shared_ptr<ClientThread>(