THRIFT-3768 fix TThreadedServer refactoring issues with client lifetime guarantees
Client: C++
Patch: Jim King <jim.king@simplivity.com>

This closes #980
diff --git a/lib/cpp/src/thrift/concurrency/Thread.h b/lib/cpp/src/thrift/concurrency/Thread.h
index f5eb3a8..f7c7bd6 100644
--- a/lib/cpp/src/thrift/concurrency/Thread.h
+++ b/lib/cpp/src/thrift/concurrency/Thread.h
@@ -135,18 +135,32 @@
  * object for execution
  */
 class ThreadFactory {
-
 public:
   virtual ~ThreadFactory() {}
+
+  /**
+   * Gets current detached mode
+   */
+  virtual bool isDetached() const = 0;
+
+  /**
+   * Create a new thread.
+   */
   virtual boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const = 0;
 
-  /** Gets the current thread id or unknown_thread_id if the current thread is not a thrift thread
+  /**
+   * Sets detached mode of threads
    */
+  virtual void setDetached(bool detached) = 0;
 
   static const Thread::id_t unknown_thread_id;
 
+  /**
+   * Gets the current thread id or unknown_thread_id if the current thread is not a thrift thread
+   */
   virtual Thread::id_t getCurrentThreadId() const = 0;
 };
+
 }
 }
 } // apache::thrift::concurrency
diff --git a/lib/cpp/src/thrift/server/TServerFramework.cpp b/lib/cpp/src/thrift/server/TServerFramework.cpp
index 56b6cca..b62cf40 100644
--- a/lib/cpp/src/thrift/server/TServerFramework.cpp
+++ b/lib/cpp/src/thrift/server/TServerFramework.cpp
@@ -17,6 +17,7 @@
  * under the License.
  */
 
+#include <algorithm>
 #include <boost/bind.hpp>
 #include <stdexcept>
 #include <stdint.h>
@@ -221,25 +222,25 @@
 }
 
 void TServerFramework::newlyConnectedClient(const boost::shared_ptr<TConnectedClient>& pClient) {
-  onClientConnected(pClient);
+  {
+    Synchronized sync(mon_);
+    ++clients_;
+    hwm_ = (std::max)(hwm_, clients_);
+  }
 
-  // Count a concurrent client added.
-  Synchronized sync(mon_);
-  ++clients_;
-  hwm_ = (std::max)(hwm_, clients_);
+  onClientConnected(pClient);
 }
 
 void TServerFramework::disposeConnectedClient(TConnectedClient* pClient) {
-  {
-    // Count a concurrent client removed.
-    Synchronized sync(mon_);
-    if (limit_ - --clients_ > 0) {
-      mon_.notify();
-    }
-  }
   onClientDisconnected(pClient);
   delete pClient;
+
+  Synchronized sync(mon_);
+  if (limit_ - --clients_ > 0) {
+    mon_.notify();
+  }
 }
+
 }
 }
 } // apache::thrift::server
diff --git a/lib/cpp/src/thrift/server/TServerFramework.h b/lib/cpp/src/thrift/server/TServerFramework.h
index a22688a..53d9bfd 100644
--- a/lib/cpp/src/thrift/server/TServerFramework.h
+++ b/lib/cpp/src/thrift/server/TServerFramework.h
@@ -123,8 +123,8 @@
 
 protected:
   /**
-   * A client has connected.  The implementation is responsible for storing
-   * and processing the client.  This is called during the serve() thread,
+   * A client has connected.  The implementation is responsible for managing the
+   * lifetime of the client object.  This is called during the serve() thread,
    * therefore a failure to return quickly will result in new client connection
    * delays.
    *
@@ -134,9 +134,10 @@
 
   /**
    * A client has disconnected.
-   * The server no longer tracks the client.
-   * The client TTransport has already been closed.
-   * The implementation must not delete the pointer.
+   * When called:
+   *   The server no longer tracks the client.
+   *   The client TTransport has already been closed.
+   *   The implementation must not delete the pointer.
    *
    * \param[in]  pClient  the disconnected client
    */
diff --git a/lib/cpp/src/thrift/server/TThreadedServer.cpp b/lib/cpp/src/thrift/server/TThreadedServer.cpp
index 92f5cf8..e15f8f1 100644
--- a/lib/cpp/src/thrift/server/TThreadedServer.cpp
+++ b/lib/cpp/src/thrift/server/TThreadedServer.cpp
@@ -17,6 +17,11 @@
  * under the License.
  */
 
+#include <boost/bind.hpp>
+#include <boost/function.hpp>
+#include <boost/make_shared.hpp>
+#include <boost/shared_ptr.hpp>
+#include <string>
 #include <thrift/concurrency/PlatformThreadFactory.h>
 #include <thrift/server/TThreadedServer.h>
 
@@ -24,6 +29,7 @@
 namespace thrift {
 namespace server {
 
+using apache::thrift::concurrency::Runnable;
 using apache::thrift::concurrency::Synchronized;
 using apache::thrift::concurrency::Thread;
 using apache::thrift::concurrency::ThreadFactory;
@@ -34,7 +40,6 @@
 using apache::thrift::transport::TTransportException;
 using apache::thrift::transport::TTransportFactory;
 using boost::shared_ptr;
-using std::string;
 
 TThreadedServer::TThreadedServer(const shared_ptr<TProcessorFactory>& processorFactory,
                                  const shared_ptr<TServerTransport>& serverTransport,
@@ -90,31 +95,68 @@
 }
 
 void TThreadedServer::serve() {
+  threadFactory_->setDetached(false);
   TServerFramework::serve();
 
-  // Drain all clients - no more will arrive
-  try {
-    Synchronized s(clientsMonitor_);
-    while (getConcurrentClientCount() > 0) {
-      clientsMonitor_.wait();
-    }
-  } catch (TException& tx) {
-    string errStr = string("TThreadedServer: Exception joining workers: ") + tx.what();
-    GlobalOutput(errStr.c_str());
+  // Ensure post-condition of no active clients
+  Synchronized s(clientMonitor_);
+  while (!activeClientMap_.empty()) {
+    clientMonitor_.wait();
+  }
+
+  drainDeadClients();
+}
+
+void TThreadedServer::drainDeadClients() {
+  // we're in a monitor here
+  while (!deadClientMap_.empty()) {
+    ClientMap::iterator it = deadClientMap_.begin();
+    it->second->join();
+    deadClientMap_.erase(it);
   }
 }
 
 void TThreadedServer::onClientConnected(const shared_ptr<TConnectedClient>& pClient) {
-  threadFactory_->newThread(pClient)->start();
+  Synchronized sync(clientMonitor_);
+  ClientMap::iterator it = activeClientMap_.insert(ClientMap::value_type(pClient.get(), boost::make_shared<TConnectedClientRunner>(pClient))).first;
+  boost::shared_ptr<apache::thrift::concurrency::Thread> pThread = threadFactory_->newThread(it->second);
+  it->second->setThread(pThread);
+  pThread->start();
 }
 
 void TThreadedServer::onClientDisconnected(TConnectedClient* pClient) {
-  THRIFT_UNUSED_VARIABLE(pClient);
-  Synchronized s(clientsMonitor_);
-  if (getConcurrentClientCount() == 0) {
-    clientsMonitor_.notify();
+  Synchronized sync(clientMonitor_);
+  drainDeadClients();	// use the outgoing thread to do some maintenance on our dead client backlog
+  ClientMap::iterator it = activeClientMap_.find(pClient);
+  ClientMap::iterator end = it;
+  deadClientMap_.insert(it, ++end);
+  activeClientMap_.erase(it);
+  if (activeClientMap_.empty()) {
+    clientMonitor_.notify();
   }
 }
+
+TThreadedServer::TConnectedClientRunner::TConnectedClientRunner(const boost::shared_ptr<TConnectedClient>& pClient)
+  : pClient_(pClient) {
+}
+
+TThreadedServer::TConnectedClientRunner::~TConnectedClientRunner() {
+}
+
+void TThreadedServer::TConnectedClientRunner::join() {
+  pThread_->join();
+}
+
+void TThreadedServer::TConnectedClientRunner::run() /* override */ {
+  pClient_->run();  // Run the client
+  pClient_.reset(); // The client is done - release it here rather than in the destructor for safety
+}
+
+void TThreadedServer::TConnectedClientRunner::setThread(
+		const boost::shared_ptr<apache::thrift::concurrency::Thread>& pThread) {
+  pThread_ = pThread;
+}
+
 }
 }
 } // apache::thrift::server
diff --git a/lib/cpp/src/thrift/server/TThreadedServer.h b/lib/cpp/src/thrift/server/TThreadedServer.h
index cdacfd7..0f2cce9 100644
--- a/lib/cpp/src/thrift/server/TThreadedServer.h
+++ b/lib/cpp/src/thrift/server/TThreadedServer.h
@@ -20,6 +20,7 @@
 #ifndef _THRIFT_SERVER_TTHREADEDSERVER_H_
 #define _THRIFT_SERVER_TTHREADEDSERVER_H_ 1
 
+#include <map>
 #include <thrift/concurrency/Monitor.h>
 #include <thrift/concurrency/PlatformThreadFactory.h>
 #include <thrift/concurrency/Thread.h>
@@ -30,7 +31,11 @@
 namespace server {
 
 /**
- * Manage clients using a thread pool.
+ * Manage clients using threads - threads are created one for each client and are
+ * released when the client disconnects.  This server is used to make a dynamically
+ * scalable server up to the concurrent connection limit.
+ *
+ * The thread factory will be changed to a non-detached type.
  */
 class TThreadedServer : public TServerFramework {
 public:
@@ -83,12 +88,59 @@
   virtual void serve();
 
 protected:
+  /**
+   * Drain recently connected clients by joining their threads - this is done lazily because
+   * we cannot do it inside the thread context that is disconnecting.
+   */
+  virtual void drainDeadClients();
+
+  /**
+   * Implementation of TServerFramework::onClientConnected
+   */
   virtual void onClientConnected(const boost::shared_ptr<TConnectedClient>& pClient) /* override */;
-  virtual void onClientDisconnected(TConnectedClient* pClient) /* override */;
+
+  /**
+   * Implementation of TServerFramework::onClientDisconnected
+   */
+  virtual void onClientDisconnected(TConnectedClient *pClient) /* override */;
 
   boost::shared_ptr<apache::thrift::concurrency::ThreadFactory> threadFactory_;
-  apache::thrift::concurrency::Monitor clientsMonitor_;
+
+  /**
+   * A helper wrapper used to wrap the client in something we can use to maintain
+   * the lifetime of the connected client within a detached thread.  We cannot simply
+   * track the threads because a shared_ptr<Thread> hangs on to the Runnable it is
+   * passed, and TServerFramework requires the runnable (TConnectedClient) to be
+   * destroyed in order to work properly.
+   */
+  class TConnectedClientRunner : public apache::thrift::concurrency::Runnable
+  {
+  public:
+    TConnectedClientRunner(const boost::shared_ptr<TConnectedClient>& pClient);
+    virtual ~TConnectedClientRunner();
+    void join();
+    void run() /* override */;
+    void setThread(const boost::shared_ptr<apache::thrift::concurrency::Thread>& pThread);
+  private:
+    boost::shared_ptr<TConnectedClient> pClient_;
+    boost::shared_ptr<apache::thrift::concurrency::Thread> pThread_;
+  };
+
+  apache::thrift::concurrency::Monitor clientMonitor_;
+
+  typedef std::map<TConnectedClient *, boost::shared_ptr<TConnectedClientRunner> > ClientMap;
+
+  /**
+   * A map of active clients
+   */
+  ClientMap activeClientMap_;
+
+  /**
+   * A map of clients that have disconnected but their threads have not been joined
+   */
+  ClientMap deadClientMap_;
 };
+
 }
 }
 } // apache::thrift::server
diff --git a/lib/cpp/test/TServerIntegrationTest.cpp b/lib/cpp/test/TServerIntegrationTest.cpp
index ce1cbd3..2180448 100644
--- a/lib/cpp/test/TServerIntegrationTest.cpp
+++ b/lib/cpp/test/TServerIntegrationTest.cpp
@@ -19,7 +19,9 @@
 
 #define BOOST_TEST_MODULE TServerIntegrationTest
 #include <boost/test/auto_unit_test.hpp>
+#include <boost/atomic.hpp>
 #include <boost/bind.hpp>
+#include <boost/date_time/posix_time/ptime.hpp>
 #include <boost/foreach.hpp>
 #include <boost/format.hpp>
 #include <boost/make_shared.hpp>
@@ -33,6 +35,7 @@
 #include <thrift/transport/TSocket.h>
 #include <thrift/transport/TTransport.h>
 #include "gen-cpp/ParentService.h"
+#include <string>
 #include <vector>
 
 using apache::thrift::concurrency::Guard;
@@ -152,7 +155,10 @@
                                   new TServerSocket("localhost", 0)),
                               boost::shared_ptr<TTransportFactory>(new TTransportFactory),
                               boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory))),
-      pEventHandler(boost::shared_ptr<TServerReadyEventHandler>(new TServerReadyEventHandler)) {
+      pEventHandler(boost::shared_ptr<TServerReadyEventHandler>(new TServerReadyEventHandler)),
+	  bStressDone(false),
+	  bStressConnectionCount(0),
+	  bStressRequestCount(0) {
     pServer->setServerEventHandler(pEventHandler);
   }
 
@@ -162,7 +168,10 @@
                           boost::shared_ptr<TServerTransport>(new TServerSocket("localhost", 0)),
                           boost::shared_ptr<TTransportFactory>(new TTransportFactory),
                           boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory))),
-      pEventHandler(boost::shared_ptr<TServerReadyEventHandler>(new TServerReadyEventHandler)) {
+      pEventHandler(boost::shared_ptr<TServerReadyEventHandler>(new TServerReadyEventHandler)),
+      bStressDone(false),
+	  bStressConnectionCount(0),
+	  bStressRequestCount(0) {
     pServer->setServerEventHandler(pEventHandler);
   }
 
@@ -175,7 +184,7 @@
       pEventHandler->wait();
     }
 
-    BOOST_TEST_MESSAGE("server is listening");
+    BOOST_TEST_MESSAGE("  server is listening");
   }
 
   void blockUntilAccepted(uint64_t numAccepted) {
@@ -184,34 +193,35 @@
       pEventHandler->wait();
     }
 
-    BOOST_TEST_MESSAGE(boost::format("server has accepted %1%") % numAccepted);
+    BOOST_TEST_MESSAGE(boost::format("  server has accepted %1%") % numAccepted);
   }
 
   void stopServer() {
     if (pServerThread) {
       pServer->stop();
-      BOOST_TEST_MESSAGE("server stop completed");
+      BOOST_TEST_MESSAGE("  server stop completed");
 
       pServerThread->join();
-      BOOST_TEST_MESSAGE("server thread joined");
+      BOOST_TEST_MESSAGE("  server thread joined");
       pServerThread.reset();
     }
   }
 
   ~TServerIntegrationTestFixture() { stopServer(); }
 
-  int getServerPort() {
-    TServerSocket* pSock = dynamic_cast<TServerSocket*>(pServer->getServerTransport().get());
-    return pSock->getPort();
-  }
+  /**
+   * Performs a baseline test where some clients are opened and issue a single operation
+   * and then disconnect at different intervals.
+   * \param[in]  numToMake  the number of concurrent clients
+   * \param[in]  expectedHWM  the high water mark we expect of concurrency
+   * \param[in]  purpose  a description of the test for logging purposes
+   */
+  void baseline(int64_t numToMake, int64_t expectedHWM, const std::string& purpose) {
+	BOOST_TEST_MESSAGE(boost::format("Testing %1%: %2% with %3% clients, expect %4% HWM")
+						% typeid(TServerType).name() % purpose % numToMake % expectedHWM);
 
-  void delayClose(boost::shared_ptr<TTransport> toClose, boost::posix_time::time_duration after) {
-    boost::this_thread::sleep(after);
-    toClose->close();
-  }
+	startServer();
 
-  void baseline(int64_t numToMake, int64_t expectedHWM) {
-    startServer();
     std::vector<boost::shared_ptr<TSocket> > holdSockets;
     std::vector<boost::shared_ptr<boost::thread> > holdThreads;
 
@@ -227,19 +237,91 @@
           new boost::thread(boost::bind(&TServerIntegrationTestFixture::delayClose,
                                         this,
                                         pClientSock,
-                                        milliseconds(100 * numToMake)))));
+                                        milliseconds(10 * numToMake)))));
     }
 
     BOOST_CHECK_EQUAL(expectedHWM, pServer->getConcurrentClientCountHWM());
-    stopServer();
+
     BOOST_FOREACH (boost::shared_ptr<boost::thread> pThread, holdThreads) { pThread->join(); }
     holdThreads.clear();
     holdSockets.clear();
+
+    stopServer();
+  }
+
+  /**
+   * Helper method used to close a connection after a delay.
+   * \param[in]  toClose  the connection to close
+   * \param[in]  after  the delay to impose
+   */
+  void delayClose(boost::shared_ptr<TTransport> toClose, boost::posix_time::time_duration after) {
+    boost::this_thread::sleep(after);
+    toClose->close();
+  }
+
+  /**
+   * \returns  the server port number
+   */
+  int getServerPort() {
+    TServerSocket* pSock = dynamic_cast<TServerSocket*>(pServer->getServerTransport().get());
+    return pSock->getPort();
+  }
+
+  /**
+   * Performs a stress test by spawning threads that connect, do a number of operations
+   * and disconnect, then a random delay, then do it over again.  This is done for a fixed
+   * period of time to test for concurrency correctness.
+   * \param[in]  numToMake  the number of concurrent clients
+   */
+  void stress(int64_t numToMake, const boost::posix_time::time_duration& duration) {
+    BOOST_TEST_MESSAGE(boost::format("Stress testing %1% with %2% clients for %3% seconds")
+        % typeid(TServerType).name() % numToMake % duration.total_seconds());
+
+    startServer();
+
+    std::vector<boost::shared_ptr<boost::thread> > holdThreads;
+    for (int64_t i = 0; i < numToMake; ++i) {
+      holdThreads.push_back(boost::shared_ptr<boost::thread>(
+        new boost::thread(boost::bind(&TServerIntegrationTestFixture::stressor, this))));
+    }
+
+    boost::this_thread::sleep(duration);
+    bStressDone = true;
+
+    BOOST_TEST_MESSAGE(boost::format("  serviced %1% connections (HWM %2%) totaling %3% requests")
+        % bStressConnectionCount % pServer->getConcurrentClientCountHWM() % bStressRequestCount);
+
+    BOOST_FOREACH (boost::shared_ptr<boost::thread> pThread, holdThreads) { pThread->join(); }
+    holdThreads.clear();
+
+    BOOST_CHECK(bStressRequestCount > 0);
+
+    stopServer();
+  }
+
+  /**
+   * Helper method to stress the system
+   */
+  void stressor() {
+	while (!bStressDone) {
+      boost::shared_ptr<TSocket> pSocket(new TSocket("localhost", getServerPort()), autoSocketCloser);
+      boost::shared_ptr<TProtocol> pProtocol(new TBinaryProtocol(pSocket));
+      ParentServiceClient client(pProtocol);
+      pSocket->open();
+      bStressConnectionCount.fetch_add(1, boost::memory_order_relaxed);
+      for (int i = 0; i < rand() % 1000; ++i) {
+    	client.incrementGeneration();
+        bStressRequestCount.fetch_add(1, boost::memory_order_relaxed);
+      }
+    }
   }
 
   boost::shared_ptr<TServerType> pServer;
   boost::shared_ptr<TServerReadyEventHandler> pEventHandler;
   boost::shared_ptr<boost::thread> pServerThread;
+  bool bStressDone;
+  boost::atomic_int64_t bStressConnectionCount;
+  boost::atomic_int64_t bStressRequestCount;
 };
 
 template <class TServerType>
@@ -264,26 +346,31 @@
 
 BOOST_FIXTURE_TEST_CASE(test_simple_factory,
                         TServerIntegrationProcessorFactoryTestFixture<TSimpleServer>) {
-  baseline(3, 1);
+  baseline(3, 1, "factory");
 }
 
 BOOST_FIXTURE_TEST_CASE(test_simple, TServerIntegrationProcessorTestFixture<TSimpleServer>) {
-  baseline(3, 1);
+  baseline(3, 1, "processor");
 }
 
 BOOST_FIXTURE_TEST_CASE(test_threaded_factory,
                         TServerIntegrationProcessorFactoryTestFixture<TThreadedServer>) {
-  baseline(10, 10);
+  baseline(10, 10, "factory");
 }
 
 BOOST_FIXTURE_TEST_CASE(test_threaded, TServerIntegrationProcessorTestFixture<TThreadedServer>) {
-  baseline(10, 10);
+  baseline(10, 10, "processor");
 }
 
 BOOST_FIXTURE_TEST_CASE(test_threaded_bound,
                         TServerIntegrationProcessorTestFixture<TThreadedServer>) {
   pServer->setConcurrentClientLimit(4);
-  baseline(10, 4);
+  baseline(10, 4, "limit by server framework");
+}
+
+BOOST_FIXTURE_TEST_CASE(test_threaded_stress,
+                        TServerIntegrationProcessorFactoryTestFixture<TThreadedServer>) {
+  stress(10, boost::posix_time::seconds(3));
 }
 
 BOOST_FIXTURE_TEST_CASE(test_threadpool_factory,
@@ -298,7 +385,7 @@
   // as accept() will be called to grab a 5th client socket, in this case
   // and then the thread factory will block adding the thread to manage
   // that client.
-  baseline(10, 5);
+  baseline(10, 5, "limit by thread manager");
 }
 
 BOOST_FIXTURE_TEST_CASE(test_threadpool,
@@ -313,7 +400,7 @@
   // as accept() will be called to grab a 5th client socket, in this case
   // and then the thread factory will block adding the thread to manage
   // that client.
-  baseline(10, 5);
+  baseline(10, 5, "limit by thread manager");
 }
 
 BOOST_FIXTURE_TEST_CASE(test_threadpool_bound,
@@ -324,7 +411,17 @@
   pServer->getThreadManager()->start();
   pServer->setConcurrentClientLimit(4);
 
-  baseline(10, 4);
+  baseline(10, 4, "server framework connection limit");
+}
+
+BOOST_FIXTURE_TEST_CASE(test_threadpool_stress,
+                        TServerIntegrationProcessorTestFixture<TThreadPoolServer>) {
+  pServer->getThreadManager()->threadFactory(
+      boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>(
+          new apache::thrift::concurrency::PlatformThreadFactory));
+  pServer->getThreadManager()->start();
+
+  stress(10, boost::posix_time::seconds(3));
 }
 
 BOOST_AUTO_TEST_SUITE_END()
@@ -334,6 +431,7 @@
 
 BOOST_AUTO_TEST_CASE(test_stop_with_interruptable_clients_connected) {
   // This tests THRIFT-2441 new behavior: stopping the server disconnects clients
+  BOOST_TEST_MESSAGE("Testing stop with interruptable clients");
 
   startServer();
 
@@ -361,6 +459,7 @@
 BOOST_AUTO_TEST_CASE(test_stop_with_uninterruptable_clients_connected) {
   // This tests pre-THRIFT-2441 behavior: stopping the server blocks until clients
   // disconnect.
+	  BOOST_TEST_MESSAGE("Testing stop with uninterruptable clients");
 
   boost::dynamic_pointer_cast<TServerSocket>(pServer->getServerTransport())
       ->setInterruptableChildren(false); // returns to pre-THRIFT-2441 behavior
@@ -389,12 +488,14 @@
 
   // Once the clients disconnect the server will stop
   stopServer();
+  BOOST_CHECK(pServer->getConcurrentClientCountHWM() > 0);
   t1.join();
   t2.join();
 }
 
 BOOST_AUTO_TEST_CASE(test_concurrent_client_limit) {
   startServer();
+  BOOST_TEST_MESSAGE("Testing the concurrent client limit");
 
   BOOST_CHECK_EQUAL(INT64_MAX, pServer->getConcurrentClientLimit());
   pServer->setConcurrentClientLimit(2);
@@ -426,6 +527,7 @@
   BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientCountHWM());
 
   stopServer();
+  BOOST_CHECK(pServer->getConcurrentClientCountHWM() > 0);
   t2.join();
 }