THRIFT-3768 fix TThreadedServer refactoring issues with client lifetime guarantees
Client: C++
Patch: Jim King <jim.king@simplivity.com>
This closes #980
diff --git a/lib/cpp/src/thrift/concurrency/Thread.h b/lib/cpp/src/thrift/concurrency/Thread.h
index f5eb3a8..f7c7bd6 100644
--- a/lib/cpp/src/thrift/concurrency/Thread.h
+++ b/lib/cpp/src/thrift/concurrency/Thread.h
@@ -135,18 +135,32 @@
* object for execution
*/
class ThreadFactory {
-
public:
virtual ~ThreadFactory() {}
+
+ /**
+ * Gets current detached mode
+ */
+ virtual bool isDetached() const = 0;
+
+ /**
+ * Create a new thread.
+ */
virtual boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const = 0;
- /** Gets the current thread id or unknown_thread_id if the current thread is not a thrift thread
+ /**
+ * Sets detached mode of threads
*/
+ virtual void setDetached(bool detached) = 0;
static const Thread::id_t unknown_thread_id;
+ /**
+ * Gets the current thread id or unknown_thread_id if the current thread is not a thrift thread
+ */
virtual Thread::id_t getCurrentThreadId() const = 0;
};
+
}
}
} // apache::thrift::concurrency
diff --git a/lib/cpp/src/thrift/server/TServerFramework.cpp b/lib/cpp/src/thrift/server/TServerFramework.cpp
index 56b6cca..b62cf40 100644
--- a/lib/cpp/src/thrift/server/TServerFramework.cpp
+++ b/lib/cpp/src/thrift/server/TServerFramework.cpp
@@ -17,6 +17,7 @@
* under the License.
*/
+#include <algorithm>
#include <boost/bind.hpp>
#include <stdexcept>
#include <stdint.h>
@@ -221,25 +222,25 @@
}
void TServerFramework::newlyConnectedClient(const boost::shared_ptr<TConnectedClient>& pClient) {
- onClientConnected(pClient);
+ {
+ Synchronized sync(mon_);
+ ++clients_;
+ hwm_ = (std::max)(hwm_, clients_);
+ }
- // Count a concurrent client added.
- Synchronized sync(mon_);
- ++clients_;
- hwm_ = (std::max)(hwm_, clients_);
+ onClientConnected(pClient);
}
void TServerFramework::disposeConnectedClient(TConnectedClient* pClient) {
- {
- // Count a concurrent client removed.
- Synchronized sync(mon_);
- if (limit_ - --clients_ > 0) {
- mon_.notify();
- }
- }
onClientDisconnected(pClient);
delete pClient;
+
+ Synchronized sync(mon_);
+ if (limit_ - --clients_ > 0) {
+ mon_.notify();
+ }
}
+
}
}
} // apache::thrift::server
diff --git a/lib/cpp/src/thrift/server/TServerFramework.h b/lib/cpp/src/thrift/server/TServerFramework.h
index a22688a..53d9bfd 100644
--- a/lib/cpp/src/thrift/server/TServerFramework.h
+++ b/lib/cpp/src/thrift/server/TServerFramework.h
@@ -123,8 +123,8 @@
protected:
/**
- * A client has connected. The implementation is responsible for storing
- * and processing the client. This is called during the serve() thread,
+ * A client has connected. The implementation is responsible for managing the
+ * lifetime of the client object. This is called during the serve() thread,
* therefore a failure to return quickly will result in new client connection
* delays.
*
@@ -134,9 +134,10 @@
/**
* A client has disconnected.
- * The server no longer tracks the client.
- * The client TTransport has already been closed.
- * The implementation must not delete the pointer.
+ * When called:
+ * The server no longer tracks the client.
+ * The client TTransport has already been closed.
+ * The implementation must not delete the pointer.
*
* \param[in] pClient the disconnected client
*/
diff --git a/lib/cpp/src/thrift/server/TThreadedServer.cpp b/lib/cpp/src/thrift/server/TThreadedServer.cpp
index 92f5cf8..e15f8f1 100644
--- a/lib/cpp/src/thrift/server/TThreadedServer.cpp
+++ b/lib/cpp/src/thrift/server/TThreadedServer.cpp
@@ -17,6 +17,11 @@
* under the License.
*/
+#include <boost/bind.hpp>
+#include <boost/function.hpp>
+#include <boost/make_shared.hpp>
+#include <boost/shared_ptr.hpp>
+#include <string>
#include <thrift/concurrency/PlatformThreadFactory.h>
#include <thrift/server/TThreadedServer.h>
@@ -24,6 +29,7 @@
namespace thrift {
namespace server {
+using apache::thrift::concurrency::Runnable;
using apache::thrift::concurrency::Synchronized;
using apache::thrift::concurrency::Thread;
using apache::thrift::concurrency::ThreadFactory;
@@ -34,7 +40,6 @@
using apache::thrift::transport::TTransportException;
using apache::thrift::transport::TTransportFactory;
using boost::shared_ptr;
-using std::string;
TThreadedServer::TThreadedServer(const shared_ptr<TProcessorFactory>& processorFactory,
const shared_ptr<TServerTransport>& serverTransport,
@@ -90,31 +95,68 @@
}
void TThreadedServer::serve() {
+ threadFactory_->setDetached(false);
TServerFramework::serve();
- // Drain all clients - no more will arrive
- try {
- Synchronized s(clientsMonitor_);
- while (getConcurrentClientCount() > 0) {
- clientsMonitor_.wait();
- }
- } catch (TException& tx) {
- string errStr = string("TThreadedServer: Exception joining workers: ") + tx.what();
- GlobalOutput(errStr.c_str());
+ // Ensure post-condition of no active clients
+ Synchronized s(clientMonitor_);
+ while (!activeClientMap_.empty()) {
+ clientMonitor_.wait();
+ }
+
+ drainDeadClients();
+}
+
+void TThreadedServer::drainDeadClients() {
+ // we're in a monitor here
+ while (!deadClientMap_.empty()) {
+ ClientMap::iterator it = deadClientMap_.begin();
+ it->second->join();
+ deadClientMap_.erase(it);
}
}
void TThreadedServer::onClientConnected(const shared_ptr<TConnectedClient>& pClient) {
- threadFactory_->newThread(pClient)->start();
+ Synchronized sync(clientMonitor_);
+ ClientMap::iterator it = activeClientMap_.insert(ClientMap::value_type(pClient.get(), boost::make_shared<TConnectedClientRunner>(pClient))).first;
+ boost::shared_ptr<apache::thrift::concurrency::Thread> pThread = threadFactory_->newThread(it->second);
+ it->second->setThread(pThread);
+ pThread->start();
}
void TThreadedServer::onClientDisconnected(TConnectedClient* pClient) {
- THRIFT_UNUSED_VARIABLE(pClient);
- Synchronized s(clientsMonitor_);
- if (getConcurrentClientCount() == 0) {
- clientsMonitor_.notify();
+ Synchronized sync(clientMonitor_);
+ drainDeadClients(); // use the outgoing thread to do some maintenance on our dead client backlog
+ ClientMap::iterator it = activeClientMap_.find(pClient);
+ ClientMap::iterator end = it;
+ deadClientMap_.insert(it, ++end);
+ activeClientMap_.erase(it);
+ if (activeClientMap_.empty()) {
+ clientMonitor_.notify();
}
}
+
+TThreadedServer::TConnectedClientRunner::TConnectedClientRunner(const boost::shared_ptr<TConnectedClient>& pClient)
+ : pClient_(pClient) {
+}
+
+TThreadedServer::TConnectedClientRunner::~TConnectedClientRunner() {
+}
+
+void TThreadedServer::TConnectedClientRunner::join() {
+ pThread_->join();
+}
+
+void TThreadedServer::TConnectedClientRunner::run() /* override */ {
+ pClient_->run(); // Run the client
+ pClient_.reset(); // The client is done - release it here rather than in the destructor for safety
+}
+
+void TThreadedServer::TConnectedClientRunner::setThread(
+ const boost::shared_ptr<apache::thrift::concurrency::Thread>& pThread) {
+ pThread_ = pThread;
+}
+
}
}
} // apache::thrift::server
diff --git a/lib/cpp/src/thrift/server/TThreadedServer.h b/lib/cpp/src/thrift/server/TThreadedServer.h
index cdacfd7..0f2cce9 100644
--- a/lib/cpp/src/thrift/server/TThreadedServer.h
+++ b/lib/cpp/src/thrift/server/TThreadedServer.h
@@ -20,6 +20,7 @@
#ifndef _THRIFT_SERVER_TTHREADEDSERVER_H_
#define _THRIFT_SERVER_TTHREADEDSERVER_H_ 1
+#include <map>
#include <thrift/concurrency/Monitor.h>
#include <thrift/concurrency/PlatformThreadFactory.h>
#include <thrift/concurrency/Thread.h>
@@ -30,7 +31,11 @@
namespace server {
/**
- * Manage clients using a thread pool.
+ * Manage clients using threads - threads are created one for each client and are
+ * released when the client disconnects. This server is used to make a dynamically
+ * scalable server up to the concurrent connection limit.
+ *
+ * The thread factory will be changed to a non-detached type.
*/
class TThreadedServer : public TServerFramework {
public:
@@ -83,12 +88,59 @@
virtual void serve();
protected:
+ /**
+ * Drain recently connected clients by joining their threads - this is done lazily because
+ * we cannot do it inside the thread context that is disconnecting.
+ */
+ virtual void drainDeadClients();
+
+ /**
+ * Implementation of TServerFramework::onClientConnected
+ */
virtual void onClientConnected(const boost::shared_ptr<TConnectedClient>& pClient) /* override */;
- virtual void onClientDisconnected(TConnectedClient* pClient) /* override */;
+
+ /**
+ * Implementation of TServerFramework::onClientDisconnected
+ */
+ virtual void onClientDisconnected(TConnectedClient *pClient) /* override */;
boost::shared_ptr<apache::thrift::concurrency::ThreadFactory> threadFactory_;
- apache::thrift::concurrency::Monitor clientsMonitor_;
+
+ /**
+ * A helper wrapper used to wrap the client in something we can use to maintain
+ * the lifetime of the connected client within a detached thread. We cannot simply
+ * track the threads because a shared_ptr<Thread> hangs on to the Runnable it is
+ * passed, and TServerFramework requires the runnable (TConnectedClient) to be
+ * destroyed in order to work properly.
+ */
+ class TConnectedClientRunner : public apache::thrift::concurrency::Runnable
+ {
+ public:
+ TConnectedClientRunner(const boost::shared_ptr<TConnectedClient>& pClient);
+ virtual ~TConnectedClientRunner();
+ void join();
+ void run() /* override */;
+ void setThread(const boost::shared_ptr<apache::thrift::concurrency::Thread>& pThread);
+ private:
+ boost::shared_ptr<TConnectedClient> pClient_;
+ boost::shared_ptr<apache::thrift::concurrency::Thread> pThread_;
+ };
+
+ apache::thrift::concurrency::Monitor clientMonitor_;
+
+ typedef std::map<TConnectedClient *, boost::shared_ptr<TConnectedClientRunner> > ClientMap;
+
+ /**
+ * A map of active clients
+ */
+ ClientMap activeClientMap_;
+
+ /**
+ * A map of clients that have disconnected but their threads have not been joined
+ */
+ ClientMap deadClientMap_;
};
+
}
}
} // apache::thrift::server
diff --git a/lib/cpp/test/TServerIntegrationTest.cpp b/lib/cpp/test/TServerIntegrationTest.cpp
index ce1cbd3..2180448 100644
--- a/lib/cpp/test/TServerIntegrationTest.cpp
+++ b/lib/cpp/test/TServerIntegrationTest.cpp
@@ -19,7 +19,9 @@
#define BOOST_TEST_MODULE TServerIntegrationTest
#include <boost/test/auto_unit_test.hpp>
+#include <boost/atomic.hpp>
#include <boost/bind.hpp>
+#include <boost/date_time/posix_time/ptime.hpp>
#include <boost/foreach.hpp>
#include <boost/format.hpp>
#include <boost/make_shared.hpp>
@@ -33,6 +35,7 @@
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TTransport.h>
#include "gen-cpp/ParentService.h"
+#include <string>
#include <vector>
using apache::thrift::concurrency::Guard;
@@ -152,7 +155,10 @@
new TServerSocket("localhost", 0)),
boost::shared_ptr<TTransportFactory>(new TTransportFactory),
boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory))),
- pEventHandler(boost::shared_ptr<TServerReadyEventHandler>(new TServerReadyEventHandler)) {
+ pEventHandler(boost::shared_ptr<TServerReadyEventHandler>(new TServerReadyEventHandler)),
+ bStressDone(false),
+ bStressConnectionCount(0),
+ bStressRequestCount(0) {
pServer->setServerEventHandler(pEventHandler);
}
@@ -162,7 +168,10 @@
boost::shared_ptr<TServerTransport>(new TServerSocket("localhost", 0)),
boost::shared_ptr<TTransportFactory>(new TTransportFactory),
boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory))),
- pEventHandler(boost::shared_ptr<TServerReadyEventHandler>(new TServerReadyEventHandler)) {
+ pEventHandler(boost::shared_ptr<TServerReadyEventHandler>(new TServerReadyEventHandler)),
+ bStressDone(false),
+ bStressConnectionCount(0),
+ bStressRequestCount(0) {
pServer->setServerEventHandler(pEventHandler);
}
@@ -175,7 +184,7 @@
pEventHandler->wait();
}
- BOOST_TEST_MESSAGE("server is listening");
+ BOOST_TEST_MESSAGE(" server is listening");
}
void blockUntilAccepted(uint64_t numAccepted) {
@@ -184,34 +193,35 @@
pEventHandler->wait();
}
- BOOST_TEST_MESSAGE(boost::format("server has accepted %1%") % numAccepted);
+ BOOST_TEST_MESSAGE(boost::format(" server has accepted %1%") % numAccepted);
}
void stopServer() {
if (pServerThread) {
pServer->stop();
- BOOST_TEST_MESSAGE("server stop completed");
+ BOOST_TEST_MESSAGE(" server stop completed");
pServerThread->join();
- BOOST_TEST_MESSAGE("server thread joined");
+ BOOST_TEST_MESSAGE(" server thread joined");
pServerThread.reset();
}
}
~TServerIntegrationTestFixture() { stopServer(); }
- int getServerPort() {
- TServerSocket* pSock = dynamic_cast<TServerSocket*>(pServer->getServerTransport().get());
- return pSock->getPort();
- }
+ /**
+ * Performs a baseline test where some clients are opened and issue a single operation
+ * and then disconnect at different intervals.
+ * \param[in] numToMake the number of concurrent clients
+ * \param[in] expectedHWM the high water mark we expect of concurrency
+ * \param[in] purpose a description of the test for logging purposes
+ */
+ void baseline(int64_t numToMake, int64_t expectedHWM, const std::string& purpose) {
+ BOOST_TEST_MESSAGE(boost::format("Testing %1%: %2% with %3% clients, expect %4% HWM")
+ % typeid(TServerType).name() % purpose % numToMake % expectedHWM);
- void delayClose(boost::shared_ptr<TTransport> toClose, boost::posix_time::time_duration after) {
- boost::this_thread::sleep(after);
- toClose->close();
- }
+ startServer();
- void baseline(int64_t numToMake, int64_t expectedHWM) {
- startServer();
std::vector<boost::shared_ptr<TSocket> > holdSockets;
std::vector<boost::shared_ptr<boost::thread> > holdThreads;
@@ -227,19 +237,91 @@
new boost::thread(boost::bind(&TServerIntegrationTestFixture::delayClose,
this,
pClientSock,
- milliseconds(100 * numToMake)))));
+ milliseconds(10 * numToMake)))));
}
BOOST_CHECK_EQUAL(expectedHWM, pServer->getConcurrentClientCountHWM());
- stopServer();
+
BOOST_FOREACH (boost::shared_ptr<boost::thread> pThread, holdThreads) { pThread->join(); }
holdThreads.clear();
holdSockets.clear();
+
+ stopServer();
+ }
+
+ /**
+ * Helper method used to close a connection after a delay.
+ * \param[in] toClose the connection to close
+ * \param[in] after the delay to impose
+ */
+ void delayClose(boost::shared_ptr<TTransport> toClose, boost::posix_time::time_duration after) {
+ boost::this_thread::sleep(after);
+ toClose->close();
+ }
+
+ /**
+ * \returns the server port number
+ */
+ int getServerPort() {
+ TServerSocket* pSock = dynamic_cast<TServerSocket*>(pServer->getServerTransport().get());
+ return pSock->getPort();
+ }
+
+ /**
+ * Performs a stress test by spawning threads that connect, do a number of operations
+ * and disconnect, then a random delay, then do it over again. This is done for a fixed
+ * period of time to test for concurrency correctness.
+ * \param[in] numToMake the number of concurrent clients
+ */
+ void stress(int64_t numToMake, const boost::posix_time::time_duration& duration) {
+ BOOST_TEST_MESSAGE(boost::format("Stress testing %1% with %2% clients for %3% seconds")
+ % typeid(TServerType).name() % numToMake % duration.total_seconds());
+
+ startServer();
+
+ std::vector<boost::shared_ptr<boost::thread> > holdThreads;
+ for (int64_t i = 0; i < numToMake; ++i) {
+ holdThreads.push_back(boost::shared_ptr<boost::thread>(
+ new boost::thread(boost::bind(&TServerIntegrationTestFixture::stressor, this))));
+ }
+
+ boost::this_thread::sleep(duration);
+ bStressDone = true;
+
+ BOOST_TEST_MESSAGE(boost::format(" serviced %1% connections (HWM %2%) totaling %3% requests")
+ % bStressConnectionCount % pServer->getConcurrentClientCountHWM() % bStressRequestCount);
+
+ BOOST_FOREACH (boost::shared_ptr<boost::thread> pThread, holdThreads) { pThread->join(); }
+ holdThreads.clear();
+
+ BOOST_CHECK(bStressRequestCount > 0);
+
+ stopServer();
+ }
+
+ /**
+ * Helper method to stress the system
+ */
+ void stressor() {
+ while (!bStressDone) {
+ boost::shared_ptr<TSocket> pSocket(new TSocket("localhost", getServerPort()), autoSocketCloser);
+ boost::shared_ptr<TProtocol> pProtocol(new TBinaryProtocol(pSocket));
+ ParentServiceClient client(pProtocol);
+ pSocket->open();
+ bStressConnectionCount.fetch_add(1, boost::memory_order_relaxed);
+ for (int i = 0; i < rand() % 1000; ++i) {
+ client.incrementGeneration();
+ bStressRequestCount.fetch_add(1, boost::memory_order_relaxed);
+ }
+ }
}
boost::shared_ptr<TServerType> pServer;
boost::shared_ptr<TServerReadyEventHandler> pEventHandler;
boost::shared_ptr<boost::thread> pServerThread;
+ bool bStressDone;
+ boost::atomic_int64_t bStressConnectionCount;
+ boost::atomic_int64_t bStressRequestCount;
};
template <class TServerType>
@@ -264,26 +346,31 @@
BOOST_FIXTURE_TEST_CASE(test_simple_factory,
TServerIntegrationProcessorFactoryTestFixture<TSimpleServer>) {
- baseline(3, 1);
+ baseline(3, 1, "factory");
}
BOOST_FIXTURE_TEST_CASE(test_simple, TServerIntegrationProcessorTestFixture<TSimpleServer>) {
- baseline(3, 1);
+ baseline(3, 1, "processor");
}
BOOST_FIXTURE_TEST_CASE(test_threaded_factory,
TServerIntegrationProcessorFactoryTestFixture<TThreadedServer>) {
- baseline(10, 10);
+ baseline(10, 10, "factory");
}
BOOST_FIXTURE_TEST_CASE(test_threaded, TServerIntegrationProcessorTestFixture<TThreadedServer>) {
- baseline(10, 10);
+ baseline(10, 10, "processor");
}
BOOST_FIXTURE_TEST_CASE(test_threaded_bound,
TServerIntegrationProcessorTestFixture<TThreadedServer>) {
pServer->setConcurrentClientLimit(4);
- baseline(10, 4);
+ baseline(10, 4, "limit by server framework");
+}
+
+BOOST_FIXTURE_TEST_CASE(test_threaded_stress,
+ TServerIntegrationProcessorFactoryTestFixture<TThreadedServer>) {
+ stress(10, boost::posix_time::seconds(3));
}
BOOST_FIXTURE_TEST_CASE(test_threadpool_factory,
@@ -298,7 +385,7 @@
// as accept() will be called to grab a 5th client socket, in this case
// and then the thread factory will block adding the thread to manage
// that client.
- baseline(10, 5);
+ baseline(10, 5, "limit by thread manager");
}
BOOST_FIXTURE_TEST_CASE(test_threadpool,
@@ -313,7 +400,7 @@
// as accept() will be called to grab a 5th client socket, in this case
// and then the thread factory will block adding the thread to manage
// that client.
- baseline(10, 5);
+ baseline(10, 5, "limit by thread manager");
}
BOOST_FIXTURE_TEST_CASE(test_threadpool_bound,
@@ -324,7 +411,17 @@
pServer->getThreadManager()->start();
pServer->setConcurrentClientLimit(4);
- baseline(10, 4);
+ baseline(10, 4, "server framework connection limit");
+}
+
+BOOST_FIXTURE_TEST_CASE(test_threadpool_stress,
+ TServerIntegrationProcessorTestFixture<TThreadPoolServer>) {
+ pServer->getThreadManager()->threadFactory(
+ boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>(
+ new apache::thrift::concurrency::PlatformThreadFactory));
+ pServer->getThreadManager()->start();
+
+ stress(10, boost::posix_time::seconds(3));
}
BOOST_AUTO_TEST_SUITE_END()
@@ -334,6 +431,7 @@
BOOST_AUTO_TEST_CASE(test_stop_with_interruptable_clients_connected) {
// This tests THRIFT-2441 new behavior: stopping the server disconnects clients
+ BOOST_TEST_MESSAGE("Testing stop with interruptable clients");
startServer();
@@ -361,6 +459,7 @@
BOOST_AUTO_TEST_CASE(test_stop_with_uninterruptable_clients_connected) {
// This tests pre-THRIFT-2441 behavior: stopping the server blocks until clients
// disconnect.
+ BOOST_TEST_MESSAGE("Testing stop with uninterruptable clients");
boost::dynamic_pointer_cast<TServerSocket>(pServer->getServerTransport())
->setInterruptableChildren(false); // returns to pre-THRIFT-2441 behavior
@@ -389,12 +488,14 @@
// Once the clients disconnect the server will stop
stopServer();
+ BOOST_CHECK(pServer->getConcurrentClientCountHWM() > 0);
t1.join();
t2.join();
}
BOOST_AUTO_TEST_CASE(test_concurrent_client_limit) {
startServer();
+ BOOST_TEST_MESSAGE("Testing the concurrent client limit");
BOOST_CHECK_EQUAL(INT64_MAX, pServer->getConcurrentClientLimit());
pServer->setConcurrentClientLimit(2);
@@ -426,6 +527,7 @@
BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientCountHWM());
stopServer();
+ BOOST_CHECK(pServer->getConcurrentClientCountHWM() > 0);
t2.join();
}