THRIFT-3084 add optional concurrent client limit enforcement to lib/cpp threaded servers
diff --git a/lib/cpp/test/Makefile.am b/lib/cpp/test/Makefile.am
index 0cd1c67..3470abb 100755
--- a/lib/cpp/test/Makefile.am
+++ b/lib/cpp/test/Makefile.am
@@ -323,7 +323,7 @@
gen-cpp/ChildService.cpp gen-cpp/ChildService.h gen-cpp/ParentService.cpp gen-cpp/ParentService.h gen-cpp/proc_types.cpp gen-cpp/proc_types.h: processor/proc.thrift
$(THRIFT) --gen cpp:templates,cob_style $<
-AM_CPPFLAGS = $(BOOST_CPPFLAGS) -I$(top_srcdir)/lib/cpp/src
+AM_CPPFLAGS = $(BOOST_CPPFLAGS) -I$(top_srcdir)/lib/cpp/src -D__STDC_LIMIT_MACROS
AM_LDFLAGS = $(BOOST_LDFLAGS)
AM_CXXFLAGS = -Wall -Wextra -pedantic
diff --git a/lib/cpp/test/TServerIntegrationTest.cpp b/lib/cpp/test/TServerIntegrationTest.cpp
index 9edeb19..73bcdba 100644
--- a/lib/cpp/test/TServerIntegrationTest.cpp
+++ b/lib/cpp/test/TServerIntegrationTest.cpp
@@ -20,9 +20,12 @@
#define BOOST_TEST_MODULE TServerIntegrationTest
#include <boost/test/auto_unit_test.hpp>
#include <boost/bind.hpp>
+#include <boost/foreach.hpp>
#include <boost/format.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
+#include <thrift/server/TSimpleServer.h>
+#include <thrift/server/TThreadPoolServer.h>
#include <thrift/server/TThreadedServer.h>
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/transport/TServerSocket.h>
@@ -44,12 +47,17 @@
using apache::thrift::transport::TServerTransport;
using apache::thrift::transport::TSocket;
using apache::thrift::transport::TTransport;
+using apache::thrift::transport::TTransportException;
using apache::thrift::transport::TTransportFactory;
+using apache::thrift::server::TServer;
using apache::thrift::server::TServerEventHandler;
+using apache::thrift::server::TSimpleServer;
+using apache::thrift::server::TThreadPoolServer;
using apache::thrift::server::TThreadedServer;
using apache::thrift::test::ParentServiceClient;
using apache::thrift::test::ParentServiceIf;
using apache::thrift::test::ParentServiceProcessor;
+using boost::posix_time::milliseconds;
/**
* preServe runs after listen() is successful, when we can connect
@@ -81,7 +89,10 @@
uint64_t accepted_;
};
-class ParentHandler : virtual public ParentServiceIf {
+/**
+ * Reusing another generated test, just something to serve up
+ */
+class ParentHandler : public ParentServiceIf {
public:
ParentHandler() : generation_(0) {}
@@ -123,11 +134,17 @@
std::vector<std::string> strings_;
};
+void autoSocketCloser(TSocket *pSock) {
+ pSock->close();
+ delete pSock;
+}
+
+template<class TServerType>
class TServerIntegrationTestFixture : public TestPortFixture
{
public:
TServerIntegrationTestFixture() :
- pServer(new TThreadedServer(
+ pServer(new TServerType(
boost::shared_ptr<ParentServiceProcessor>(new ParentServiceProcessor(
boost::shared_ptr<ParentServiceIf>(new ParentHandler))),
boost::shared_ptr<TServerTransport>(new TServerSocket("localhost", m_serverPort)),
@@ -139,7 +156,7 @@
}
void startServer() {
- pServerThread.reset(new boost::thread(boost::bind(&TThreadedServer::serve, pServer.get())));
+ pServerThread.reset(new boost::thread(boost::bind(&TServerType::serve, pServer.get())));
// block until listen() completes so clients will be able to connect
Synchronized sync(*(pEventHandler.get()));
@@ -160,52 +177,117 @@
}
void stopServer() {
- pServer->stop();
- BOOST_MESSAGE("server stop completed");
- pServerThread->join();
- BOOST_MESSAGE("server thread joined");
+ if (pServerThread) {
+ pServer->stop();
+ BOOST_MESSAGE("server stop completed");
+
+ pServerThread->join();
+ BOOST_MESSAGE("server thread joined");
+ pServerThread.reset();
+ }
}
~TServerIntegrationTestFixture() {
stopServer();
}
- void delayClose(boost::shared_ptr<TTransport> toClose) {
- boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
+ void delayClose(boost::shared_ptr<TTransport> toClose, boost::posix_time::time_duration after) {
+ boost::this_thread::sleep(after);
toClose->close();
}
- boost::shared_ptr<TThreadedServer> pServer;
+ void baseline(int64_t numToMake, int64_t expectedHWM) {
+ startServer();
+ std::vector<boost::shared_ptr<TSocket> > holdSockets;
+ std::vector<boost::shared_ptr<boost::thread> > holdThreads;
+
+ for (int64_t i = 0; i < numToMake; ++i) {
+ boost::shared_ptr<TSocket> pClientSock(new TSocket("localhost", m_serverPort), autoSocketCloser);
+ holdSockets.push_back(pClientSock);
+ boost::shared_ptr<TProtocol> pClientProtocol(new TBinaryProtocol(pClientSock));
+ ParentServiceClient client(pClientProtocol);
+ pClientSock->open();
+ client.incrementGeneration();
+ holdThreads.push_back(
+ boost::shared_ptr<boost::thread>(
+ new boost::thread(
+ boost::bind(&TServerIntegrationTestFixture::delayClose, this,
+ pClientSock, milliseconds(100 * numToMake)))));
+ }
+
+ BOOST_CHECK_EQUAL(expectedHWM, pServer->getConcurrentClientCountHWM());
+ stopServer();
+ BOOST_FOREACH(boost::shared_ptr<boost::thread> pThread, holdThreads) {
+ pThread->join();
+ }
+ holdThreads.clear();
+ holdSockets.clear();
+ }
+
+ boost::shared_ptr<TServerType> pServer;
boost::shared_ptr<TServerReadyEventHandler> pEventHandler;
boost::shared_ptr<boost::thread> pServerThread;
};
-BOOST_FIXTURE_TEST_SUITE ( TServerIntegrationTest, TServerIntegrationTestFixture )
+BOOST_FIXTURE_TEST_SUITE( Baseline, TestPortFixture )
-BOOST_AUTO_TEST_CASE(test_execute_one_request_and_close)
+BOOST_FIXTURE_TEST_CASE(test_simple, TServerIntegrationTestFixture<TSimpleServer>)
{
- // this test establishes some basic sanity
-
- startServer();
- boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort));
- boost::shared_ptr<TProtocol> pClientProtocol1(new TBinaryProtocol(pClientSock1));
- ParentServiceClient client1(pClientProtocol1);
- pClientSock1->open();
- client1.incrementGeneration();
- pClientSock1->close();
- stopServer();
+ baseline(3, 1);
}
+BOOST_FIXTURE_TEST_CASE(test_threaded, TServerIntegrationTestFixture<TThreadedServer>)
+{
+ baseline(10, 10);
+}
+
+BOOST_FIXTURE_TEST_CASE(test_threaded_bound, TServerIntegrationTestFixture<TThreadedServer>)
+{
+ pServer->setConcurrentClientLimit(4);
+ baseline(10, 4);
+}
+
+BOOST_FIXTURE_TEST_CASE(test_threadpool, TServerIntegrationTestFixture<TThreadPoolServer>)
+{
+ pServer->getThreadManager()->threadFactory(
+ boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>(
+ new apache::thrift::concurrency::PlatformThreadFactory));
+ pServer->getThreadManager()->start();
+
+ // thread factory has 4 threads as a default
+ // thread factory however is a bad way to limit concurrent clients
+ // as accept() will be called to grab a 5th client socket, in this case
+ // and then the thread factory will block adding the thread to manage
+ // that client.
+ baseline(10, 5);
+}
+
+BOOST_FIXTURE_TEST_CASE(test_threadpool_bound, TServerIntegrationTestFixture<TThreadPoolServer>)
+{
+ pServer->getThreadManager()->threadFactory(
+ boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>(
+ new apache::thrift::concurrency::PlatformThreadFactory));
+ pServer->getThreadManager()->start();
+ pServer->setConcurrentClientLimit(4);
+
+ baseline(10, 4);
+}
+
+BOOST_AUTO_TEST_SUITE_END()
+
+
+BOOST_FIXTURE_TEST_SUITE ( TServerIntegrationTest, TServerIntegrationTestFixture<TThreadedServer> )
+
BOOST_AUTO_TEST_CASE(test_stop_with_interruptable_clients_connected)
{
// This tests THRIFT-2441 new behavior: stopping the server disconnects clients
startServer();
- boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort));
+ boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort), autoSocketCloser);
pClientSock1->open();
- boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", m_serverPort));
+ boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", m_serverPort), autoSocketCloser);
pClientSock2->open();
// Ensure they have been accepted
@@ -219,8 +301,6 @@
uint8_t buf[1];
BOOST_CHECK_EQUAL(0, pClientSock1->read(&buf[0], 1)); // 0 = disconnected
BOOST_CHECK_EQUAL(0, pClientSock2->read(&buf[0], 1)); // 0 = disconnected
- pClientSock1->close();
- pClientSock2->close();
}
BOOST_AUTO_TEST_CASE(test_stop_with_uninterruptable_clients_connected)
@@ -230,24 +310,56 @@
boost::dynamic_pointer_cast<TServerSocket>(pServer->getServerTransport())->
setInterruptableChildren(false); // returns to pre-THRIFT-2441 behavior
+
startServer();
- boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort));
+ boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort), autoSocketCloser);
pClientSock1->open();
- boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", m_serverPort));
+ boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", m_serverPort), autoSocketCloser);
pClientSock2->open();
// Ensure they have been accepted
blockUntilAccepted(2);
- boost::thread t1(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock1));
- boost::thread t2(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock2));
+ boost::thread t1(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock1, milliseconds(250)));
+ boost::thread t2(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock2, milliseconds(250)));
// Once the clients disconnect the server will stop
stopServer();
-
- pClientSock1->close();
- pClientSock2->close();
+ t1.join();
+ t2.join();
}
+
+BOOST_AUTO_TEST_CASE(test_concurrent_client_limit)
+{
+ startServer();
+
+ BOOST_CHECK_EQUAL(INT64_MAX, pServer->getConcurrentClientLimit());
+ pServer->setConcurrentClientLimit(2);
+ BOOST_CHECK_EQUAL(0, pServer->getConcurrentClientCount());
+ BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientLimit());
+
+ boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort), autoSocketCloser);
+ pClientSock1->open();
+ blockUntilAccepted(1);
+ BOOST_CHECK_EQUAL(1, pServer->getConcurrentClientCount());
+
+ boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", m_serverPort), autoSocketCloser);
+ pClientSock2->open();
+ blockUntilAccepted(2);
+ BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientCount());
+
+ // a third client cannot connect until one of the other two closes
+ boost::thread t2(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock2, milliseconds(250)));
+ boost::shared_ptr<TSocket> pClientSock3(new TSocket("localhost", m_serverPort), autoSocketCloser);
+ pClientSock2->open();
+ blockUntilAccepted(2);
+ BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientCount());
+ BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientCountHWM());
+
+ stopServer();
+ t2.join();
+}
+
BOOST_AUTO_TEST_SUITE_END()
diff --git a/lib/cpp/test/ZlibTest.cpp b/lib/cpp/test/ZlibTest.cpp
index bafacf9..465e12d 100644
--- a/lib/cpp/test/ZlibTest.cpp
+++ b/lib/cpp/test/ZlibTest.cpp
@@ -17,7 +17,6 @@
* under the License.
*/
-#define __STDC_LIMIT_MACROS
#define __STDC_FORMAT_MACROS
#ifndef _GNU_SOURCE