THRIFT-3084 add optional concurrent client limit enforcement to lib/cpp threaded servers
diff --git a/lib/c_glib/test/testthrifttestclient.cpp b/lib/c_glib/test/testthrifttestclient.cpp
index 4f7bc08..d387396 100755
--- a/lib/c_glib/test/testthrifttestclient.cpp
+++ b/lib/c_glib/test/testthrifttestclient.cpp
@@ -317,6 +317,8 @@
// C CLIENT
extern "C" {
+#undef THRIFT_SOCKET /* from lib/cpp */
+
#include "t_test_thrift_test.h"
#include "t_test_thrift_test_types.h"
#include <thrift/c_glib/transport/thrift_socket.h>
diff --git a/lib/cpp/CMakeLists.txt b/lib/cpp/CMakeLists.txt
index 8ea0546..b444c35 100755
--- a/lib/cpp/CMakeLists.txt
+++ b/lib/cpp/CMakeLists.txt
@@ -35,9 +35,11 @@
src/thrift/Thrift.cpp
src/thrift/TApplicationException.cpp
src/thrift/VirtualProfiling.cpp
+ src/thrift/async/TAsyncChannel.cpp
src/thrift/concurrency/ThreadManager.cpp
src/thrift/concurrency/TimerManager.cpp
src/thrift/concurrency/Util.cpp
+ src/thrift/processor/PeekProcessor.cpp
src/thrift/protocol/TDebugProtocol.cpp
src/thrift/protocol/TDenseProtocol.cpp
src/thrift/protocol/TJSONProtocol.cpp
@@ -60,8 +62,6 @@
src/thrift/server/TSimpleServer.cpp
src/thrift/server/TThreadPoolServer.cpp
src/thrift/server/TThreadedServer.cpp
- src/thrift/async/TAsyncChannel.cpp
- src/thrift/processor/PeekProcessor.cpp
)
# This files don't work on Windows CE as there is no pipe support
@@ -185,6 +185,8 @@
add_definitions("-DUNICODE -D_UNICODE")
endif()
+add_definitions("-D__STDC_LIMIT_MACROS")
+
# Install the headers
install(DIRECTORY "src/thrift" DESTINATION "${INCLUDE_INSTALL_DIR}"
FILES_MATCHING PATTERN "*.h" PATTERN "*.tcc")
diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am
index 28ff7c8..0de8dc7 100755
--- a/lib/cpp/Makefile.am
+++ b/lib/cpp/Makefile.am
@@ -57,7 +57,7 @@
endif
AM_CXXFLAGS = -Wall -Wextra -pedantic
-AM_CPPFLAGS = $(BOOST_CPPFLAGS) $(OPENSSL_INCLUDES) -I$(srcdir)/src
+AM_CPPFLAGS = $(BOOST_CPPFLAGS) $(OPENSSL_INCLUDES) -I$(srcdir)/src -D__STDC_LIMIT_MACROS
AM_LDFLAGS = $(BOOST_LDFLAGS) $(OPENSSL_LDFLAGS)
# Define the source files for the module
@@ -65,9 +65,11 @@
libthrift_la_SOURCES = src/thrift/Thrift.cpp \
src/thrift/TApplicationException.cpp \
src/thrift/VirtualProfiling.cpp \
+ src/thrift/async/TAsyncChannel.cpp \
src/thrift/concurrency/ThreadManager.cpp \
src/thrift/concurrency/TimerManager.cpp \
src/thrift/concurrency/Util.cpp \
+ src/thrift/processor/PeekProcessor.cpp \
src/thrift/protocol/TDebugProtocol.cpp \
src/thrift/protocol/TDenseProtocol.cpp \
src/thrift/protocol/TJSONProtocol.cpp \
@@ -94,9 +96,7 @@
src/thrift/server/TServerFramework.cpp \
src/thrift/server/TSimpleServer.cpp \
src/thrift/server/TThreadPoolServer.cpp \
- src/thrift/server/TThreadedServer.cpp \
- src/thrift/async/TAsyncChannel.cpp \
- src/thrift/processor/PeekProcessor.cpp
+ src/thrift/server/TThreadedServer.cpp
if WITH_BOOSTTHREADS
libthrift_la_SOURCES += src/thrift/concurrency/BoostThreadFactory.cpp \
diff --git a/lib/cpp/src/thrift/protocol/TDenseProtocol.cpp b/lib/cpp/src/thrift/protocol/TDenseProtocol.cpp
index 583b630..259c68e 100644
--- a/lib/cpp/src/thrift/protocol/TDenseProtocol.cpp
+++ b/lib/cpp/src/thrift/protocol/TDenseProtocol.cpp
@@ -87,7 +87,6 @@
absent and prefix them with an 0x01 byte if they are present
*/
-#define __STDC_LIMIT_MACROS
#include <stdint.h>
#include <thrift/protocol/TDenseProtocol.h>
#include <thrift/TReflectionLocal.h>
diff --git a/lib/cpp/src/thrift/server/TServerFramework.cpp b/lib/cpp/src/thrift/server/TServerFramework.cpp
index 8adb29a..36dab5b 100644
--- a/lib/cpp/src/thrift/server/TServerFramework.cpp
+++ b/lib/cpp/src/thrift/server/TServerFramework.cpp
@@ -18,12 +18,15 @@
*/
#include <boost/bind.hpp>
+#include <stdexcept>
+#include <stdint.h>
#include <thrift/server/TServerFramework.h>
namespace apache {
namespace thrift {
namespace server {
+using apache::thrift::concurrency::Synchronized;
using apache::thrift::transport::TServerTransport;
using apache::thrift::transport::TTransport;
using apache::thrift::transport::TTransportException;
@@ -39,14 +42,20 @@
const shared_ptr<TServerTransport>& serverTransport,
const shared_ptr<TTransportFactory>& transportFactory,
const shared_ptr<TProtocolFactory>& protocolFactory)
- : TServer(processorFactory, serverTransport, transportFactory, protocolFactory) {}
+ : TServer(processorFactory, serverTransport, transportFactory, protocolFactory),
+ clients_(0),
+ hwm_(0),
+ limit_(INT64_MAX) {}
TServerFramework::TServerFramework(
const shared_ptr<TProcessor>& processor,
const shared_ptr<TServerTransport>& serverTransport,
const shared_ptr<TTransportFactory>& transportFactory,
const shared_ptr<TProtocolFactory>& protocolFactory)
- : TServer(processor, serverTransport, transportFactory, protocolFactory) {}
+ : TServer(processor, serverTransport, transportFactory, protocolFactory),
+ clients_(0),
+ hwm_(0),
+ limit_(INT64_MAX) {}
TServerFramework::TServerFramework(
const shared_ptr<TProcessorFactory>& processorFactory,
@@ -57,7 +66,10 @@
const shared_ptr<TProtocolFactory>& outputProtocolFactory)
: TServer(processorFactory, serverTransport,
inputTransportFactory, outputTransportFactory,
- inputProtocolFactory, outputProtocolFactory) {}
+ inputProtocolFactory, outputProtocolFactory),
+ clients_(0),
+ hwm_(0),
+ limit_(INT64_MAX) {}
TServerFramework::TServerFramework(
const shared_ptr<TProcessor>& processor,
@@ -68,7 +80,10 @@
const shared_ptr<TProtocolFactory>& outputProtocolFactory)
: TServer(processor, serverTransport,
inputTransportFactory, outputTransportFactory,
- inputProtocolFactory, outputProtocolFactory) {}
+ inputProtocolFactory, outputProtocolFactory),
+ clients_(0),
+ hwm_(0),
+ limit_(INT64_MAX) {}
TServerFramework::~TServerFramework() {}
@@ -111,6 +126,16 @@
inputTransport.reset();
client.reset();
+ // If we have reached the limit on the number of concurrent
+ // clients allowed, wait for one or more clients to drain before
+ // accepting another.
+ {
+ Synchronized sync(mon_);
+ while (clients_ >= limit_) {
+ mon_.wait();
+ }
+ }
+
client = serverTransport_->accept();
inputTransport = inputTransportFactory_->getTransport(client);
@@ -118,11 +143,12 @@
inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
- onClientConnected(
+ newlyConnectedClient(
shared_ptr<TConnectedClient>(
new TConnectedClient(getProcessor(inputProtocol, outputProtocol, client),
inputProtocol, outputProtocol, eventHandler_, client),
bind(&TServerFramework::disposeConnectedClient, this, _1)));
+
} catch (TTransportException& ttx) {
releaseOneDescriptor("inputTransport", inputTransport);
releaseOneDescriptor("outputTransport", outputTransport);
@@ -147,12 +173,54 @@
releaseOneDescriptor("serverTransport", serverTransport_);
}
+int64_t TServerFramework::getConcurrentClientLimit() const {
+ Synchronized sync(mon_);
+ return limit_;
+}
+
+int64_t TServerFramework::getConcurrentClientCount() const {
+ Synchronized sync(mon_);
+ return clients_;
+}
+
+int64_t TServerFramework::getConcurrentClientCountHWM() const {
+ Synchronized sync(mon_);
+ return hwm_;
+}
+
+void TServerFramework::setConcurrentClientLimit(int64_t newLimit) {
+ if (newLimit < 1) {
+ throw std::invalid_argument("newLimit must be greater than zero");
+ }
+ Synchronized sync(mon_);
+ limit_ = newLimit;
+ if (limit_ - clients_ > 0) {
+ mon_.notify();
+ }
+}
+
void TServerFramework::stop() {
serverTransport_->interrupt();
serverTransport_->interruptChildren();
}
+void TServerFramework::newlyConnectedClient(const boost::shared_ptr<TConnectedClient>& pClient) {
+ onClientConnected(pClient);
+
+ // Count a concurrent client added.
+ Synchronized sync(mon_);
+ ++clients_;
+ hwm_ = std::max(hwm_, clients_);
+}
+
void TServerFramework::disposeConnectedClient(TConnectedClient *pClient) {
+ {
+ // Count a concurrent client removed.
+ Synchronized sync(mon_);
+ if (limit_ - --clients_ > 0) {
+ mon_.notify();
+ }
+ }
onClientDisconnected(pClient);
delete pClient;
}
diff --git a/lib/cpp/src/thrift/server/TServerFramework.h b/lib/cpp/src/thrift/server/TServerFramework.h
index 67d5420..3f16dd1 100644
--- a/lib/cpp/src/thrift/server/TServerFramework.h
+++ b/lib/cpp/src/thrift/server/TServerFramework.h
@@ -21,7 +21,9 @@
#define _THRIFT_SERVER_TSERVERFRAMEWORK_H_ 1
#include <boost/shared_ptr.hpp>
+#include <stdint.h>
#include <thrift/TProcessor.h>
+#include <thrift/concurrency/Monitor.h>
#include <thrift/server/TConnectedClient.h>
#include <thrift/server/TServer.h>
#include <thrift/transport/TServerTransport.h>
@@ -89,6 +91,36 @@
*/
virtual void stop();
+ /**
+ * Get the concurrent client limit.
+ * \returns the concurrent client limit
+ */
+ virtual int64_t getConcurrentClientLimit() const;
+
+ /**
+ * Get the number of currently connected clients.
+ * \returns the number of currently connected clients
+ */
+ virtual int64_t getConcurrentClientCount() const;
+
+ /**
+ * Get the highest number of concurrent clients.
+ * \returns the highest number of concurrent clients
+ */
+ virtual int64_t getConcurrentClientCountHWM() const;
+
+ /**
+ * Set the concurrent client limit. This can be changed while
+ * the server is serving however it will not necessarily be
+ * enforced until the next client is accepted and added. If the
+ * limit is lowered below the number of connected clients, no
+ * action is taken to disconnect the clients.
+ * The default value used if this is not called is INT64_MAX.
+ * \param[in] newLimit the new limit of concurrent clients
+ * \throws std::invalid_argument if newLimit is less than 1
+ */
+ virtual void setConcurrentClientLimit(int64_t newLimit);
+
protected:
/**
* A client has connected. The implementation is responsible for storing
@@ -102,6 +134,7 @@
/**
* A client has disconnected.
+ * The server no longer tracks the client.
* The client TTransport has already been closed.
* The implementation must not delete the pointer.
*
@@ -111,10 +144,37 @@
private:
/**
+ * Common handling for new connected clients. Implements concurrent
+ * client rate limiting after onClientConnected returns by blocking the
+ * serve() thread if the limit has been reached.
+ */
+ void newlyConnectedClient(const boost::shared_ptr<TConnectedClient>& pClient);
+
+ /**
* Smart pointer client deletion.
* Calls onClientDisconnected and then deletes pClient.
*/
void disposeConnectedClient(TConnectedClient *pClient);
+
+ /**
+ * Monitor for limiting the number of concurrent clients.
+ */
+ apache::thrift::concurrency::Monitor mon_;
+
+ /**
+ * The number of concurrent clients.
+ */
+ int64_t clients_;
+
+ /**
+ * The high water mark of concurrent clients.
+ */
+ int64_t hwm_;
+
+ /**
+ * The limit on the number of concurrent clients.
+ */
+ int64_t limit_;
};
}
diff --git a/lib/cpp/src/thrift/server/TSimpleServer.cpp b/lib/cpp/src/thrift/server/TSimpleServer.cpp
index a133c0d..adcedc8 100644
--- a/lib/cpp/src/thrift/server/TSimpleServer.cpp
+++ b/lib/cpp/src/thrift/server/TSimpleServer.cpp
@@ -38,7 +38,9 @@
const shared_ptr<TTransportFactory>& transportFactory,
const shared_ptr<TProtocolFactory>& protocolFactory)
: TServerFramework(processorFactory, serverTransport,
- transportFactory, protocolFactory) {}
+ transportFactory, protocolFactory) {
+ TServerFramework::setConcurrentClientLimit(1);
+}
TSimpleServer::TSimpleServer(
const shared_ptr<TProcessor>& processor,
@@ -46,7 +48,9 @@
const shared_ptr<TTransportFactory>& transportFactory,
const shared_ptr<TProtocolFactory>& protocolFactory)
: TServerFramework(processor, serverTransport,
- transportFactory, protocolFactory) {}
+ transportFactory, protocolFactory) {
+ TServerFramework::setConcurrentClientLimit(1);
+}
TSimpleServer::TSimpleServer(
const shared_ptr<TProcessorFactory>& processorFactory,
@@ -57,7 +61,9 @@
const shared_ptr<TProtocolFactory>& outputProtocolFactory)
: TServerFramework(processorFactory, serverTransport,
inputTransportFactory, outputTransportFactory,
- inputProtocolFactory, outputProtocolFactory) {}
+ inputProtocolFactory, outputProtocolFactory) {
+ TServerFramework::setConcurrentClientLimit(1);
+}
TSimpleServer::TSimpleServer(
const shared_ptr<TProcessor>& processor,
@@ -68,7 +74,9 @@
const shared_ptr<TProtocolFactory>& outputProtocolFactory)
: TServerFramework(processor, serverTransport,
inputTransportFactory, outputTransportFactory,
- inputProtocolFactory, outputProtocolFactory) {}
+ inputProtocolFactory, outputProtocolFactory) {
+ TServerFramework::setConcurrentClientLimit(1);
+}
TSimpleServer::~TSimpleServer() {}
@@ -86,6 +94,13 @@
*/
void TSimpleServer::onClientDisconnected(TConnectedClient *pClient) {}
+/**
+ * This makes little sense to the simple server because it is not capable
+ * of having more than one client at a time, so we hide it.
+ */
+void TSimpleServer::setConcurrentClientLimit(int64_t newLimit) {}
+
+
}
}
} // apache::thrift::server
diff --git a/lib/cpp/src/thrift/server/TSimpleServer.h b/lib/cpp/src/thrift/server/TSimpleServer.h
index 51b00e4..30d5046 100644
--- a/lib/cpp/src/thrift/server/TSimpleServer.h
+++ b/lib/cpp/src/thrift/server/TSimpleServer.h
@@ -62,6 +62,9 @@
protected:
virtual void onClientConnected(const boost::shared_ptr<TConnectedClient>& pClient) /* override */;
virtual void onClientDisconnected(TConnectedClient *pClient) /* override */;
+
+private:
+ void setConcurrentClientLimit(int64_t newLimit); // hide
};
}
diff --git a/lib/cpp/src/thrift/server/TThreadPoolServer.cpp b/lib/cpp/src/thrift/server/TThreadPoolServer.cpp
index a5f8c76..5b9b01d 100644
--- a/lib/cpp/src/thrift/server/TThreadPoolServer.cpp
+++ b/lib/cpp/src/thrift/server/TThreadPoolServer.cpp
@@ -112,6 +112,10 @@
taskExpiration_ = value;
}
+boost::shared_ptr<apache::thrift::concurrency::ThreadManager> TThreadPoolServer::getThreadManager() const {
+ return threadManager_;
+}
+
void TThreadPoolServer::onClientConnected(const shared_ptr<TConnectedClient>& pClient) {
threadManager_->add(pClient, timeout_, taskExpiration_);
}
diff --git a/lib/cpp/src/thrift/server/TThreadPoolServer.h b/lib/cpp/src/thrift/server/TThreadPoolServer.h
index 29e9aaf..267dbad 100644
--- a/lib/cpp/src/thrift/server/TThreadPoolServer.h
+++ b/lib/cpp/src/thrift/server/TThreadPoolServer.h
@@ -37,14 +37,16 @@
const boost::shared_ptr<apache::thrift::transport::TServerTransport>& serverTransport,
const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& transportFactory,
const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& protocolFactory,
- const boost::shared_ptr<apache::thrift::concurrency::ThreadManager>& threadManager);
+ const boost::shared_ptr<apache::thrift::concurrency::ThreadManager>& threadManager =
+ apache::thrift::concurrency::ThreadManager::newSimpleThreadManager());
TThreadPoolServer(
const boost::shared_ptr<apache::thrift::TProcessor>& processor,
const boost::shared_ptr<apache::thrift::transport::TServerTransport>& serverTransport,
const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& transportFactory,
const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& protocolFactory,
- const boost::shared_ptr<apache::thrift::concurrency::ThreadManager>& threadManager);
+ const boost::shared_ptr<apache::thrift::concurrency::ThreadManager>& threadManager =
+ apache::thrift::concurrency::ThreadManager::newSimpleThreadManager());
TThreadPoolServer(
const boost::shared_ptr<apache::thrift::TProcessorFactory>& processorFactory,
@@ -53,7 +55,8 @@
const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& outputTransportFactory,
const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& inputProtocolFactory,
const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& outputProtocolFactory,
- const boost::shared_ptr<apache::thrift::concurrency::ThreadManager>& threadManager);
+ const boost::shared_ptr<apache::thrift::concurrency::ThreadManager>& threadManager =
+ apache::thrift::concurrency::ThreadManager::newSimpleThreadManager());
TThreadPoolServer(
const boost::shared_ptr<apache::thrift::TProcessor>& processor,
@@ -62,7 +65,8 @@
const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& outputTransportFactory,
const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& inputProtocolFactory,
const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& outputProtocolFactory,
- const boost::shared_ptr<apache::thrift::concurrency::ThreadManager>& threadManager);
+ const boost::shared_ptr<apache::thrift::concurrency::ThreadManager>& threadManager =
+ apache::thrift::concurrency::ThreadManager::newSimpleThreadManager());
virtual ~TThreadPoolServer();
@@ -78,6 +82,8 @@
virtual int64_t getTaskExpiration() const;
virtual void setTaskExpiration(int64_t value);
+ virtual boost::shared_ptr<apache::thrift::concurrency::ThreadManager> getThreadManager() const;
+
protected:
virtual void onClientConnected(const boost::shared_ptr<TConnectedClient>& pClient) /* override */;
virtual void onClientDisconnected(TConnectedClient *pClient) /* override */;
diff --git a/lib/cpp/src/thrift/server/TThreadedServer.cpp b/lib/cpp/src/thrift/server/TThreadedServer.cpp
index 440cede..b0b22c3 100644
--- a/lib/cpp/src/thrift/server/TThreadedServer.cpp
+++ b/lib/cpp/src/thrift/server/TThreadedServer.cpp
@@ -89,7 +89,7 @@
// Drain all clients - no more will arrive
try {
Synchronized s(clientsMonitor_);
- while (!clients_.empty()) {
+ while (getConcurrentClientCount() > 0) {
clientsMonitor_.wait();
}
} catch (TException& tx) {
@@ -98,27 +98,14 @@
}
}
-void TThreadedServer::onClientConnected(const shared_ptr<TConnectedClient>& pClient)
-{
- // Create a thread for this client
- shared_ptr<Thread> thread = shared_ptr<Thread>(threadFactory_->newThread(pClient));
-
- // Insert thread into the set of threads
- {
- Synchronized s(clientsMonitor_);
- clients_.insert(pClient.get());
- }
-
- // Start the thread!
- thread->start();
+void TThreadedServer::onClientConnected(const shared_ptr<TConnectedClient>& pClient) {
+ threadFactory_->newThread(pClient)->start();
}
void TThreadedServer::onClientDisconnected(TConnectedClient *pClient) {
- // Remove this task from parent bookkeeping
Synchronized s(clientsMonitor_);
- clients_.erase(pClient);
- if (clients_.empty()) {
- clientsMonitor_.notify();
+ if (getConcurrentClientCount() == 0) {
+ clientsMonitor_.notify();
}
}
diff --git a/lib/cpp/src/thrift/server/TThreadedServer.h b/lib/cpp/src/thrift/server/TThreadedServer.h
index 7b66f1d..21b6a28 100644
--- a/lib/cpp/src/thrift/server/TThreadedServer.h
+++ b/lib/cpp/src/thrift/server/TThreadedServer.h
@@ -29,8 +29,6 @@
namespace thrift {
namespace server {
-#define THRIFT_DEFAULT_THREAD_FACTORY
-
/**
* Manage clients using a thread pool.
*/
@@ -86,7 +84,6 @@
boost::shared_ptr<apache::thrift::concurrency::ThreadFactory> threadFactory_;
apache::thrift::concurrency::Monitor clientsMonitor_;
- std::set<TConnectedClient*> clients_;
};
}
diff --git a/lib/cpp/test/Makefile.am b/lib/cpp/test/Makefile.am
index 0cd1c67..3470abb 100755
--- a/lib/cpp/test/Makefile.am
+++ b/lib/cpp/test/Makefile.am
@@ -323,7 +323,7 @@
gen-cpp/ChildService.cpp gen-cpp/ChildService.h gen-cpp/ParentService.cpp gen-cpp/ParentService.h gen-cpp/proc_types.cpp gen-cpp/proc_types.h: processor/proc.thrift
$(THRIFT) --gen cpp:templates,cob_style $<
-AM_CPPFLAGS = $(BOOST_CPPFLAGS) -I$(top_srcdir)/lib/cpp/src
+AM_CPPFLAGS = $(BOOST_CPPFLAGS) -I$(top_srcdir)/lib/cpp/src -D__STDC_LIMIT_MACROS
AM_LDFLAGS = $(BOOST_LDFLAGS)
AM_CXXFLAGS = -Wall -Wextra -pedantic
diff --git a/lib/cpp/test/TServerIntegrationTest.cpp b/lib/cpp/test/TServerIntegrationTest.cpp
index 9edeb19..73bcdba 100644
--- a/lib/cpp/test/TServerIntegrationTest.cpp
+++ b/lib/cpp/test/TServerIntegrationTest.cpp
@@ -20,9 +20,12 @@
#define BOOST_TEST_MODULE TServerIntegrationTest
#include <boost/test/auto_unit_test.hpp>
#include <boost/bind.hpp>
+#include <boost/foreach.hpp>
#include <boost/format.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
+#include <thrift/server/TSimpleServer.h>
+#include <thrift/server/TThreadPoolServer.h>
#include <thrift/server/TThreadedServer.h>
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/transport/TServerSocket.h>
@@ -44,12 +47,17 @@
using apache::thrift::transport::TServerTransport;
using apache::thrift::transport::TSocket;
using apache::thrift::transport::TTransport;
+using apache::thrift::transport::TTransportException;
using apache::thrift::transport::TTransportFactory;
+using apache::thrift::server::TServer;
using apache::thrift::server::TServerEventHandler;
+using apache::thrift::server::TSimpleServer;
+using apache::thrift::server::TThreadPoolServer;
using apache::thrift::server::TThreadedServer;
using apache::thrift::test::ParentServiceClient;
using apache::thrift::test::ParentServiceIf;
using apache::thrift::test::ParentServiceProcessor;
+using boost::posix_time::milliseconds;
/**
* preServe runs after listen() is successful, when we can connect
@@ -81,7 +89,10 @@
uint64_t accepted_;
};
-class ParentHandler : virtual public ParentServiceIf {
+/**
+ * Reusing another generated test, just something to serve up
+ */
+class ParentHandler : public ParentServiceIf {
public:
ParentHandler() : generation_(0) {}
@@ -123,11 +134,17 @@
std::vector<std::string> strings_;
};
+void autoSocketCloser(TSocket *pSock) {
+ pSock->close();
+ delete pSock;
+}
+
+template<class TServerType>
class TServerIntegrationTestFixture : public TestPortFixture
{
public:
TServerIntegrationTestFixture() :
- pServer(new TThreadedServer(
+ pServer(new TServerType(
boost::shared_ptr<ParentServiceProcessor>(new ParentServiceProcessor(
boost::shared_ptr<ParentServiceIf>(new ParentHandler))),
boost::shared_ptr<TServerTransport>(new TServerSocket("localhost", m_serverPort)),
@@ -139,7 +156,7 @@
}
void startServer() {
- pServerThread.reset(new boost::thread(boost::bind(&TThreadedServer::serve, pServer.get())));
+ pServerThread.reset(new boost::thread(boost::bind(&TServerType::serve, pServer.get())));
// block until listen() completes so clients will be able to connect
Synchronized sync(*(pEventHandler.get()));
@@ -160,52 +177,117 @@
}
void stopServer() {
- pServer->stop();
- BOOST_MESSAGE("server stop completed");
- pServerThread->join();
- BOOST_MESSAGE("server thread joined");
+ if (pServerThread) {
+ pServer->stop();
+ BOOST_MESSAGE("server stop completed");
+
+ pServerThread->join();
+ BOOST_MESSAGE("server thread joined");
+ pServerThread.reset();
+ }
}
~TServerIntegrationTestFixture() {
stopServer();
}
- void delayClose(boost::shared_ptr<TTransport> toClose) {
- boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
+ void delayClose(boost::shared_ptr<TTransport> toClose, boost::posix_time::time_duration after) {
+ boost::this_thread::sleep(after);
toClose->close();
}
- boost::shared_ptr<TThreadedServer> pServer;
+ void baseline(int64_t numToMake, int64_t expectedHWM) {
+ startServer();
+ std::vector<boost::shared_ptr<TSocket> > holdSockets;
+ std::vector<boost::shared_ptr<boost::thread> > holdThreads;
+
+ for (int64_t i = 0; i < numToMake; ++i) {
+ boost::shared_ptr<TSocket> pClientSock(new TSocket("localhost", m_serverPort), autoSocketCloser);
+ holdSockets.push_back(pClientSock);
+ boost::shared_ptr<TProtocol> pClientProtocol(new TBinaryProtocol(pClientSock));
+ ParentServiceClient client(pClientProtocol);
+ pClientSock->open();
+ client.incrementGeneration();
+ holdThreads.push_back(
+ boost::shared_ptr<boost::thread>(
+ new boost::thread(
+ boost::bind(&TServerIntegrationTestFixture::delayClose, this,
+ pClientSock, milliseconds(100 * numToMake)))));
+ }
+
+ BOOST_CHECK_EQUAL(expectedHWM, pServer->getConcurrentClientCountHWM());
+ stopServer();
+ BOOST_FOREACH(boost::shared_ptr<boost::thread> pThread, holdThreads) {
+ pThread->join();
+ }
+ holdThreads.clear();
+ holdSockets.clear();
+ }
+
+ boost::shared_ptr<TServerType> pServer;
boost::shared_ptr<TServerReadyEventHandler> pEventHandler;
boost::shared_ptr<boost::thread> pServerThread;
};
-BOOST_FIXTURE_TEST_SUITE ( TServerIntegrationTest, TServerIntegrationTestFixture )
+BOOST_FIXTURE_TEST_SUITE( Baseline, TestPortFixture )
-BOOST_AUTO_TEST_CASE(test_execute_one_request_and_close)
+BOOST_FIXTURE_TEST_CASE(test_simple, TServerIntegrationTestFixture<TSimpleServer>)
{
- // this test establishes some basic sanity
-
- startServer();
- boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort));
- boost::shared_ptr<TProtocol> pClientProtocol1(new TBinaryProtocol(pClientSock1));
- ParentServiceClient client1(pClientProtocol1);
- pClientSock1->open();
- client1.incrementGeneration();
- pClientSock1->close();
- stopServer();
+ baseline(3, 1);
}
+BOOST_FIXTURE_TEST_CASE(test_threaded, TServerIntegrationTestFixture<TThreadedServer>)
+{
+ baseline(10, 10);
+}
+
+BOOST_FIXTURE_TEST_CASE(test_threaded_bound, TServerIntegrationTestFixture<TThreadedServer>)
+{
+ pServer->setConcurrentClientLimit(4);
+ baseline(10, 4);
+}
+
+BOOST_FIXTURE_TEST_CASE(test_threadpool, TServerIntegrationTestFixture<TThreadPoolServer>)
+{
+ pServer->getThreadManager()->threadFactory(
+ boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>(
+ new apache::thrift::concurrency::PlatformThreadFactory));
+ pServer->getThreadManager()->start();
+
+ // thread factory has 4 threads as a default
+ // thread factory however is a bad way to limit concurrent clients
+ // as accept() will be called to grab a 5th client socket, in this case
+ // and then the thread factory will block adding the thread to manage
+ // that client.
+ baseline(10, 5);
+}
+
+BOOST_FIXTURE_TEST_CASE(test_threadpool_bound, TServerIntegrationTestFixture<TThreadPoolServer>)
+{
+ pServer->getThreadManager()->threadFactory(
+ boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>(
+ new apache::thrift::concurrency::PlatformThreadFactory));
+ pServer->getThreadManager()->start();
+ pServer->setConcurrentClientLimit(4);
+
+ baseline(10, 4);
+}
+
+BOOST_AUTO_TEST_SUITE_END()
+
+
+BOOST_FIXTURE_TEST_SUITE ( TServerIntegrationTest, TServerIntegrationTestFixture<TThreadedServer> )
+
BOOST_AUTO_TEST_CASE(test_stop_with_interruptable_clients_connected)
{
// This tests THRIFT-2441 new behavior: stopping the server disconnects clients
startServer();
- boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort));
+ boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort), autoSocketCloser);
pClientSock1->open();
- boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", m_serverPort));
+ boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", m_serverPort), autoSocketCloser);
pClientSock2->open();
// Ensure they have been accepted
@@ -219,8 +301,6 @@
uint8_t buf[1];
BOOST_CHECK_EQUAL(0, pClientSock1->read(&buf[0], 1)); // 0 = disconnected
BOOST_CHECK_EQUAL(0, pClientSock2->read(&buf[0], 1)); // 0 = disconnected
- pClientSock1->close();
- pClientSock2->close();
}
BOOST_AUTO_TEST_CASE(test_stop_with_uninterruptable_clients_connected)
@@ -230,24 +310,56 @@
boost::dynamic_pointer_cast<TServerSocket>(pServer->getServerTransport())->
setInterruptableChildren(false); // returns to pre-THRIFT-2441 behavior
+
startServer();
- boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort));
+ boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort), autoSocketCloser);
pClientSock1->open();
- boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", m_serverPort));
+ boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", m_serverPort), autoSocketCloser);
pClientSock2->open();
// Ensure they have been accepted
blockUntilAccepted(2);
- boost::thread t1(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock1));
- boost::thread t2(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock2));
+ boost::thread t1(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock1, milliseconds(250)));
+ boost::thread t2(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock2, milliseconds(250)));
// Once the clients disconnect the server will stop
stopServer();
-
- pClientSock1->close();
- pClientSock2->close();
+ t1.join();
+ t2.join();
}
+
+BOOST_AUTO_TEST_CASE(test_concurrent_client_limit)
+{
+ startServer();
+
+ BOOST_CHECK_EQUAL(INT64_MAX, pServer->getConcurrentClientLimit());
+ pServer->setConcurrentClientLimit(2);
+ BOOST_CHECK_EQUAL(0, pServer->getConcurrentClientCount());
+ BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientLimit());
+
+ boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort), autoSocketCloser);
+ pClientSock1->open();
+ blockUntilAccepted(1);
+ BOOST_CHECK_EQUAL(1, pServer->getConcurrentClientCount());
+
+ boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", m_serverPort), autoSocketCloser);
+ pClientSock2->open();
+ blockUntilAccepted(2);
+ BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientCount());
+
+ // a third client cannot connect until one of the other two closes
+ boost::thread t2(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock2, milliseconds(250)));
+ boost::shared_ptr<TSocket> pClientSock3(new TSocket("localhost", m_serverPort), autoSocketCloser);
+ pClientSock2->open();
+ blockUntilAccepted(2);
+ BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientCount());
+ BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientCountHWM());
+
+ stopServer();
+ t2.join();
+}
+
BOOST_AUTO_TEST_SUITE_END()
diff --git a/lib/cpp/test/ZlibTest.cpp b/lib/cpp/test/ZlibTest.cpp
index bafacf9..465e12d 100644
--- a/lib/cpp/test/ZlibTest.cpp
+++ b/lib/cpp/test/ZlibTest.cpp
@@ -17,7 +17,6 @@
* under the License.
*/
-#define __STDC_LIMIT_MACROS
#define __STDC_FORMAT_MACROS
#ifndef _GNU_SOURCE