THRIFT-3084 add optional concurrent client limit enforcement to lib/cpp threaded servers
diff --git a/lib/cpp/test/Makefile.am b/lib/cpp/test/Makefile.am
index 0cd1c67..3470abb 100755
--- a/lib/cpp/test/Makefile.am
+++ b/lib/cpp/test/Makefile.am
@@ -323,7 +323,7 @@
 gen-cpp/ChildService.cpp gen-cpp/ChildService.h gen-cpp/ParentService.cpp gen-cpp/ParentService.h gen-cpp/proc_types.cpp gen-cpp/proc_types.h: processor/proc.thrift
 	$(THRIFT) --gen cpp:templates,cob_style $<
 
-AM_CPPFLAGS = $(BOOST_CPPFLAGS) -I$(top_srcdir)/lib/cpp/src
+AM_CPPFLAGS = $(BOOST_CPPFLAGS) -I$(top_srcdir)/lib/cpp/src -D__STDC_LIMIT_MACROS
 AM_LDFLAGS = $(BOOST_LDFLAGS)
 AM_CXXFLAGS = -Wall -Wextra -pedantic
 
diff --git a/lib/cpp/test/TServerIntegrationTest.cpp b/lib/cpp/test/TServerIntegrationTest.cpp
index 9edeb19..73bcdba 100644
--- a/lib/cpp/test/TServerIntegrationTest.cpp
+++ b/lib/cpp/test/TServerIntegrationTest.cpp
@@ -20,9 +20,12 @@
 #define BOOST_TEST_MODULE TServerIntegrationTest
 #include <boost/test/auto_unit_test.hpp>
 #include <boost/bind.hpp>
+#include <boost/foreach.hpp>
 #include <boost/format.hpp>
 #include <boost/shared_ptr.hpp>
 #include <boost/thread.hpp>
+#include <thrift/server/TSimpleServer.h>
+#include <thrift/server/TThreadPoolServer.h>
 #include <thrift/server/TThreadedServer.h>
 #include <thrift/protocol/TBinaryProtocol.h>
 #include <thrift/transport/TServerSocket.h>
@@ -44,12 +47,17 @@
 using apache::thrift::transport::TServerTransport;
 using apache::thrift::transport::TSocket;
 using apache::thrift::transport::TTransport;
+using apache::thrift::transport::TTransportException;
 using apache::thrift::transport::TTransportFactory;
+using apache::thrift::server::TServer;
 using apache::thrift::server::TServerEventHandler;
+using apache::thrift::server::TSimpleServer;
+using apache::thrift::server::TThreadPoolServer;
 using apache::thrift::server::TThreadedServer;
 using apache::thrift::test::ParentServiceClient;
 using apache::thrift::test::ParentServiceIf;
 using apache::thrift::test::ParentServiceProcessor;
+using boost::posix_time::milliseconds;
 
 /**
  * preServe runs after listen() is successful, when we can connect
@@ -81,7 +89,10 @@
   uint64_t accepted_;
 };
 
-class ParentHandler : virtual public ParentServiceIf {
+/**
+ * Reusing another generated test, just something to serve up
+ */
+class ParentHandler : public ParentServiceIf {
 public:
   ParentHandler() : generation_(0) {}
 
@@ -123,11 +134,17 @@
   std::vector<std::string> strings_;
 };
 
+void autoSocketCloser(TSocket *pSock) {
+  pSock->close();
+  delete pSock;
+}
+
+template<class TServerType>
 class TServerIntegrationTestFixture : public TestPortFixture
 {
 public:
   TServerIntegrationTestFixture() :
-      pServer(new TThreadedServer(
+      pServer(new TServerType(
                     boost::shared_ptr<ParentServiceProcessor>(new ParentServiceProcessor(
                             boost::shared_ptr<ParentServiceIf>(new ParentHandler))),
                     boost::shared_ptr<TServerTransport>(new TServerSocket("localhost", m_serverPort)),
@@ -139,7 +156,7 @@
   }
 
   void startServer() {
-    pServerThread.reset(new boost::thread(boost::bind(&TThreadedServer::serve, pServer.get())));
+    pServerThread.reset(new boost::thread(boost::bind(&TServerType::serve, pServer.get())));
 
     // block until listen() completes so clients will be able to connect
     Synchronized sync(*(pEventHandler.get()));
@@ -160,52 +177,117 @@
   }
 
   void stopServer() {
-    pServer->stop();
-    BOOST_MESSAGE("server stop completed");
-    pServerThread->join();
-    BOOST_MESSAGE("server thread joined");
+    if (pServerThread) {
+      pServer->stop();
+      BOOST_MESSAGE("server stop completed");
+
+      pServerThread->join();
+      BOOST_MESSAGE("server thread joined");
+      pServerThread.reset();
+    }
   }
 
   ~TServerIntegrationTestFixture() {
     stopServer();
   }
 
-  void delayClose(boost::shared_ptr<TTransport> toClose) {
-    boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
+  void delayClose(boost::shared_ptr<TTransport> toClose, boost::posix_time::time_duration after) {
+    boost::this_thread::sleep(after);
     toClose->close();
   }
 
-  boost::shared_ptr<TThreadedServer> pServer;
+  void baseline(int64_t numToMake, int64_t expectedHWM) {
+    startServer();
+    std::vector<boost::shared_ptr<TSocket> > holdSockets;
+    std::vector<boost::shared_ptr<boost::thread> > holdThreads;
+
+    for (int64_t i = 0; i < numToMake; ++i) {
+        boost::shared_ptr<TSocket> pClientSock(new TSocket("localhost", m_serverPort), autoSocketCloser);
+        holdSockets.push_back(pClientSock);
+        boost::shared_ptr<TProtocol> pClientProtocol(new TBinaryProtocol(pClientSock));
+        ParentServiceClient client(pClientProtocol);
+        pClientSock->open();
+        client.incrementGeneration();
+        holdThreads.push_back(
+                boost::shared_ptr<boost::thread>(
+                        new boost::thread(
+                                boost::bind(&TServerIntegrationTestFixture::delayClose, this,
+                                            pClientSock, milliseconds(100 * numToMake)))));
+    }
+
+    BOOST_CHECK_EQUAL(expectedHWM, pServer->getConcurrentClientCountHWM());
+    stopServer();
+    BOOST_FOREACH(boost::shared_ptr<boost::thread> pThread, holdThreads) {
+        pThread->join();
+    }
+    holdThreads.clear();
+    holdSockets.clear();
+  }
+
+  boost::shared_ptr<TServerType> pServer;
   boost::shared_ptr<TServerReadyEventHandler> pEventHandler;
   boost::shared_ptr<boost::thread> pServerThread;
 };
 
-BOOST_FIXTURE_TEST_SUITE ( TServerIntegrationTest, TServerIntegrationTestFixture )
+BOOST_FIXTURE_TEST_SUITE( Baseline, TestPortFixture )
 
-BOOST_AUTO_TEST_CASE(test_execute_one_request_and_close)
+BOOST_FIXTURE_TEST_CASE(test_simple, TServerIntegrationTestFixture<TSimpleServer>)
 {
-    // this test establishes some basic sanity
-
-    startServer();
-    boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort));
-    boost::shared_ptr<TProtocol> pClientProtocol1(new TBinaryProtocol(pClientSock1));
-    ParentServiceClient client1(pClientProtocol1);
-    pClientSock1->open();
-    client1.incrementGeneration();
-    pClientSock1->close();
-    stopServer();
+    baseline(3, 1);
 }
 
+BOOST_FIXTURE_TEST_CASE(test_threaded, TServerIntegrationTestFixture<TThreadedServer>)
+{
+    baseline(10, 10);
+}
+
+BOOST_FIXTURE_TEST_CASE(test_threaded_bound, TServerIntegrationTestFixture<TThreadedServer>)
+{
+    pServer->setConcurrentClientLimit(4);
+    baseline(10, 4);
+}
+
+BOOST_FIXTURE_TEST_CASE(test_threadpool, TServerIntegrationTestFixture<TThreadPoolServer>)
+{
+    pServer->getThreadManager()->threadFactory(
+            boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>(
+                    new apache::thrift::concurrency::PlatformThreadFactory));
+    pServer->getThreadManager()->start();
+
+    // thread factory has 4 threads as a default
+    // thread factory however is a bad way to limit concurrent clients
+    // as accept() will be called to grab a 5th client socket, in this case
+    // and then the thread factory will block adding the thread to manage
+    // that client.
+    baseline(10, 5);
+}
+
+BOOST_FIXTURE_TEST_CASE(test_threadpool_bound, TServerIntegrationTestFixture<TThreadPoolServer>)
+{
+    pServer->getThreadManager()->threadFactory(
+            boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>(
+                    new apache::thrift::concurrency::PlatformThreadFactory));
+    pServer->getThreadManager()->start();
+    pServer->setConcurrentClientLimit(4);
+
+    baseline(10, 4);
+}
+
+BOOST_AUTO_TEST_SUITE_END()
+
+
+BOOST_FIXTURE_TEST_SUITE ( TServerIntegrationTest, TServerIntegrationTestFixture<TThreadedServer> )
+
 BOOST_AUTO_TEST_CASE(test_stop_with_interruptable_clients_connected)
 {
     // This tests THRIFT-2441 new behavior: stopping the server disconnects clients
 
     startServer();
 
-    boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort));
+    boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort), autoSocketCloser);
     pClientSock1->open();
 
-    boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", m_serverPort));
+    boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", m_serverPort), autoSocketCloser);
     pClientSock2->open();
 
     // Ensure they have been accepted
@@ -219,8 +301,6 @@
     uint8_t buf[1];
     BOOST_CHECK_EQUAL(0, pClientSock1->read(&buf[0], 1));   // 0 = disconnected
     BOOST_CHECK_EQUAL(0, pClientSock2->read(&buf[0], 1));   // 0 = disconnected
-    pClientSock1->close();
-    pClientSock2->close();
 }
 
 BOOST_AUTO_TEST_CASE(test_stop_with_uninterruptable_clients_connected)
@@ -230,24 +310,56 @@
 
     boost::dynamic_pointer_cast<TServerSocket>(pServer->getServerTransport())->
             setInterruptableChildren(false);    // returns to pre-THRIFT-2441 behavior
+
     startServer();
 
-    boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort));
+    boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort), autoSocketCloser);
     pClientSock1->open();
 
-    boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", m_serverPort));
+    boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", m_serverPort), autoSocketCloser);
     pClientSock2->open();
 
     // Ensure they have been accepted
     blockUntilAccepted(2);
 
-    boost::thread t1(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock1));
-    boost::thread t2(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock2));
+    boost::thread t1(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock1, milliseconds(250)));
+    boost::thread t2(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock2, milliseconds(250)));
 
     // Once the clients disconnect the server will stop
     stopServer();
-
-    pClientSock1->close();
-    pClientSock2->close();
+    t1.join();
+    t2.join();
 }
+
+BOOST_AUTO_TEST_CASE(test_concurrent_client_limit)
+{
+    startServer();
+
+    BOOST_CHECK_EQUAL(INT64_MAX, pServer->getConcurrentClientLimit());
+    pServer->setConcurrentClientLimit(2);
+    BOOST_CHECK_EQUAL(0, pServer->getConcurrentClientCount());
+    BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientLimit());
+
+    boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort), autoSocketCloser);
+    pClientSock1->open();
+    blockUntilAccepted(1);
+    BOOST_CHECK_EQUAL(1, pServer->getConcurrentClientCount());
+
+    boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", m_serverPort), autoSocketCloser);
+    pClientSock2->open();
+    blockUntilAccepted(2);
+    BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientCount());
+
+    // a third client cannot connect until one of the other two closes
+    boost::thread t2(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock2, milliseconds(250)));
+    boost::shared_ptr<TSocket> pClientSock3(new TSocket("localhost", m_serverPort), autoSocketCloser);
+    pClientSock2->open();
+    blockUntilAccepted(2);
+    BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientCount());
+    BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientCountHWM());
+
+    stopServer();
+    t2.join();
+}
+
 BOOST_AUTO_TEST_SUITE_END()
diff --git a/lib/cpp/test/ZlibTest.cpp b/lib/cpp/test/ZlibTest.cpp
index bafacf9..465e12d 100644
--- a/lib/cpp/test/ZlibTest.cpp
+++ b/lib/cpp/test/ZlibTest.cpp
@@ -17,7 +17,6 @@
  * under the License.
  */
 
-#define __STDC_LIMIT_MACROS
 #define __STDC_FORMAT_MACROS
 
 #ifndef _GNU_SOURCE