THRIFT-2924 TNonblockingServer crashes when user-provided event_base is used
diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.cpp b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
index 705b0ac..587560c 100644
--- a/lib/cpp/src/thrift/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
@@ -1188,6 +1188,8 @@
if (!numIOThreads_) {
numIOThreads_ = DEFAULT_IO_THREADS;
}
+ // User-provided event-base doesn't works for multi-threaded servers
+ assert(numIOThreads_ == 1 || !userEventBase_);
for (uint32_t id = 0; id < numIOThreads_; ++id) {
// the first IO thread also does the listening on server socket
@@ -1243,7 +1245,8 @@
*/
void TNonblockingServer::serve() {
- registerEvents(NULL);
+ if(ioThreads_.empty())
+ registerEvents(NULL);
// Run the primary (listener) IO thread loop in our main thread; this will
// only return when the server is shutting down.
diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.h b/lib/cpp/src/thrift/server/TNonblockingServer.h
index 4f23487..0a0d167 100644
--- a/lib/cpp/src/thrift/server/TNonblockingServer.h
+++ b/lib/cpp/src/thrift/server/TNonblockingServer.h
@@ -409,7 +409,11 @@
* PosixThreadFactory for the IO worker threads, because they must joinable
* for clean shutdown.
*/
- void setNumIOThreads(size_t numThreads) { numIOThreads_ = numThreads; }
+ void setNumIOThreads(size_t numThreads) {
+ numIOThreads_ = numThreads;
+ // User-provided event-base doesn't works for multi-threaded servers
+ assert(numIOThreads_ <= 1 || !userEventBase_);
+ }
/** Return whether the IO threads will get high scheduling priority */
bool useHighPriorityIOThreads() const { return useHighPriorityIOThreads_; }
diff --git a/lib/cpp/test/CMakeLists.txt b/lib/cpp/test/CMakeLists.txt
index d218da9..bb486df 100644
--- a/lib/cpp/test/CMakeLists.txt
+++ b/lib/cpp/test/CMakeLists.txt
@@ -210,6 +210,18 @@
${Boost_LIBRARIES}
)
add_test(NAME processor_test COMMAND processor_test)
+
+set(TNonblockingServerTest_SOURCES TNonblockingServerTest.cpp)
+add_executable(TNonblockingServerTest ${TNonblockingServerTest_SOURCES})
+include_directories(${LIBEVENT_INCLUDE_DIRS})
+target_link_libraries(TNonblockingServerTest
+ testgencpp_cob
+ thrift
+ thriftnb
+ ${LIBEVENT_LIBRARIES}
+ ${Boost_LIBRARIES}
+)
+add_test(NAME TNonblockingServerTest COMMAND TNonblockingServerTest)
endif()
if(OPENSSL_FOUND AND WITH_OPENSSL)
diff --git a/lib/cpp/test/TNonblockingServerTest.cpp b/lib/cpp/test/TNonblockingServerTest.cpp
index 2a73439..4aa4c28 100644
--- a/lib/cpp/test/TNonblockingServerTest.cpp
+++ b/lib/cpp/test/TNonblockingServerTest.cpp
@@ -26,6 +26,8 @@
#include "gen-cpp/ParentService.h"
+#include <event.h>
+
using namespace apache::thrift;
struct Handler : public test::ParentServiceIf {
@@ -57,9 +59,26 @@
}
};
+ struct EventDeleter {
+ void operator()(event_base* p) { event_base_free(p); }
+ };
+
protected:
Fixture() : processor(new test::ParentServiceProcessor(boost::make_shared<Handler>())) {}
+ ~Fixture() {
+ if (server) {
+ server->stop();
+ }
+ if (thread) {
+ thread->join();
+ }
+ }
+
+ void setEventBase(event_base* user_event_base) {
+ userEventBase_.reset(user_event_base, EventDeleter());
+ }
+
int startServer(int port) {
boost::scoped_ptr<concurrency::ThreadFactory> threadFactory(
new concurrency::PlatformThreadFactory(
@@ -73,6 +92,14 @@
int retry_count = port ? 10 : 0;
for (int p = port; p <= port + retry_count; p++) {
server.reset(new server::TNonblockingServer(processor, p));
+ if (userEventBase_) {
+ try {
+ server->registerEvents(userEventBase_.get());
+ } catch (const TException& x) {
+ // retry with next port
+ continue;
+ }
+ }
boost::shared_ptr<Runner> runner(new Runner);
runner->server = server;
thread = threadFactory->newThread(runner);
@@ -99,6 +126,7 @@
}
private:
+ boost::shared_ptr<event_base> userEventBase_;
boost::shared_ptr<test::ParentServiceProcessor> processor;
boost::shared_ptr<concurrency::Thread> thread;
@@ -129,4 +157,17 @@
BOOST_CHECK_EQUAL(server->getListenPort(), 0);
}
+BOOST_FIXTURE_TEST_CASE(provide_event_base, Fixture) {
+ event_base* eb = event_base_new();
+ setEventBase(eb);
+ startServer(0);
+
+ // assert that the server works
+ BOOST_CHECK(canCommunicate(server->getListenPort()));
+#if LIBEVENT_VERSION_NUMBER > 0x02010400
+ // also assert that the event_base is actually used when it's easy
+ BOOST_CHECK_GT(event_base_get_num_events(eb, EVENT_BASE_COUNT_ADDED), 0);
+#endif
+}
+
BOOST_AUTO_TEST_SUITE_END()