THRIFT-3768 fix TThreadedServer refactoring issues with client lifetime guarantees
Client: C++
Patch: Jim King <jim.king@simplivity.com>
This closes #977
diff --git a/lib/cpp/src/thrift/concurrency/ThreadManager.cpp b/lib/cpp/src/thrift/concurrency/ThreadManager.cpp
index 24bfeec..8f74fa6 100644
--- a/lib/cpp/src/thrift/concurrency/ThreadManager.cpp
+++ b/lib/cpp/src/thrift/concurrency/ThreadManager.cpp
@@ -544,7 +544,6 @@
private:
const size_t workerCount_;
const size_t pendingTaskCountMax_;
- Monitor monitor_;
};
shared_ptr<ThreadManager> ThreadManager::newThreadManager() {
diff --git a/lib/cpp/src/thrift/server/TServerFramework.cpp b/lib/cpp/src/thrift/server/TServerFramework.cpp
index 9fdd694..c617ad0 100644
--- a/lib/cpp/src/thrift/server/TServerFramework.cpp
+++ b/lib/cpp/src/thrift/server/TServerFramework.cpp
@@ -219,25 +219,27 @@
}
void TServerFramework::newlyConnectedClient(const boost::shared_ptr<TConnectedClient>& pClient) {
- onClientConnected(pClient);
+ {
+ // Count a concurrent client added.
+ Synchronized sync(mon_);
+ ++clients_;
+ hwm_ = (std::max)(hwm_, clients_);
+ }
- // Count a concurrent client added.
- Synchronized sync(mon_);
- ++clients_;
- hwm_ = (std::max)(hwm_, clients_);
+ onClientConnected(pClient);
}
void TServerFramework::disposeConnectedClient(TConnectedClient* pClient) {
- {
- // Count a concurrent client removed.
- Synchronized sync(mon_);
- if (limit_ - --clients_ > 0) {
- mon_.notify();
- }
- }
onClientDisconnected(pClient);
delete pClient;
+
+ // Count a concurrent client removed.
+ Synchronized sync(mon_);
+ if (limit_ - --clients_ > 0) {
+ mon_.notify();
+ }
}
+
}
}
} // apache::thrift::server
diff --git a/lib/cpp/src/thrift/server/TThreadedServer.cpp b/lib/cpp/src/thrift/server/TThreadedServer.cpp
index 92f5cf8..36d1fc4 100644
--- a/lib/cpp/src/thrift/server/TThreadedServer.cpp
+++ b/lib/cpp/src/thrift/server/TThreadedServer.cpp
@@ -41,8 +41,10 @@
const shared_ptr<TTransportFactory>& transportFactory,
const shared_ptr<TProtocolFactory>& protocolFactory,
const shared_ptr<ThreadFactory>& threadFactory)
- : TServerFramework(processorFactory, serverTransport, transportFactory, protocolFactory),
- threadFactory_(threadFactory) {
+ : TThreadPoolServer(processorFactory, serverTransport, transportFactory, protocolFactory,
+ apache::thrift::concurrency::ThreadManager::newSimpleThreadManager(0, 0)) {
+ threadManager_->threadFactory(threadFactory);
+ threadManager_->start();
}
TThreadedServer::TThreadedServer(const shared_ptr<TProcessor>& processor,
@@ -50,8 +52,10 @@
const shared_ptr<TTransportFactory>& transportFactory,
const shared_ptr<TProtocolFactory>& protocolFactory,
const shared_ptr<ThreadFactory>& threadFactory)
- : TServerFramework(processor, serverTransport, transportFactory, protocolFactory),
- threadFactory_(threadFactory) {
+ : TThreadPoolServer(processor, serverTransport, transportFactory, protocolFactory,
+ apache::thrift::concurrency::ThreadManager::newSimpleThreadManager(0, 0)) {
+ threadManager_->threadFactory(threadFactory);
+ threadManager_->start();
}
TThreadedServer::TThreadedServer(const shared_ptr<TProcessorFactory>& processorFactory,
@@ -61,13 +65,15 @@
const shared_ptr<TProtocolFactory>& inputProtocolFactory,
const shared_ptr<TProtocolFactory>& outputProtocolFactory,
const shared_ptr<ThreadFactory>& threadFactory)
- : TServerFramework(processorFactory,
- serverTransport,
- inputTransportFactory,
- outputTransportFactory,
- inputProtocolFactory,
- outputProtocolFactory),
- threadFactory_(threadFactory) {
+ : TThreadPoolServer(processorFactory,
+ serverTransport,
+ inputTransportFactory,
+ outputTransportFactory,
+ inputProtocolFactory,
+ outputProtocolFactory,
+ apache::thrift::concurrency::ThreadManager::newSimpleThreadManager(0, 0)) {
+ threadManager_->threadFactory(threadFactory);
+ threadManager_->start();
}
TThreadedServer::TThreadedServer(const shared_ptr<TProcessor>& processor,
@@ -77,44 +83,29 @@
const shared_ptr<TProtocolFactory>& inputProtocolFactory,
const shared_ptr<TProtocolFactory>& outputProtocolFactory,
const shared_ptr<ThreadFactory>& threadFactory)
- : TServerFramework(processor,
- serverTransport,
- inputTransportFactory,
- outputTransportFactory,
- inputProtocolFactory,
- outputProtocolFactory),
- threadFactory_(threadFactory) {
+ : TThreadPoolServer(processor,
+ serverTransport,
+ inputTransportFactory,
+ outputTransportFactory,
+ inputProtocolFactory,
+ outputProtocolFactory,
+ apache::thrift::concurrency::ThreadManager::newSimpleThreadManager(0, 0)) {
+ threadManager_->threadFactory(threadFactory);
+ threadManager_->start();
}
TThreadedServer::~TThreadedServer() {
}
-void TThreadedServer::serve() {
- TServerFramework::serve();
-
- // Drain all clients - no more will arrive
- try {
- Synchronized s(clientsMonitor_);
- while (getConcurrentClientCount() > 0) {
- clientsMonitor_.wait();
- }
- } catch (TException& tx) {
- string errStr = string("TThreadedServer: Exception joining workers: ") + tx.what();
- GlobalOutput(errStr.c_str());
- }
-}
-
void TThreadedServer::onClientConnected(const shared_ptr<TConnectedClient>& pClient) {
- threadFactory_->newThread(pClient)->start();
+ if (!threadManager_->idleWorkerCount())
+ {
+ threadManager_->addWorker();
+ }
+
+ TThreadPoolServer::onClientConnected(pClient);
}
-void TThreadedServer::onClientDisconnected(TConnectedClient* pClient) {
- THRIFT_UNUSED_VARIABLE(pClient);
- Synchronized s(clientsMonitor_);
- if (getConcurrentClientCount() == 0) {
- clientsMonitor_.notify();
- }
-}
}
}
} // apache::thrift::server
diff --git a/lib/cpp/src/thrift/server/TThreadedServer.h b/lib/cpp/src/thrift/server/TThreadedServer.h
index cdacfd7..a5b005a 100644
--- a/lib/cpp/src/thrift/server/TThreadedServer.h
+++ b/lib/cpp/src/thrift/server/TThreadedServer.h
@@ -20,19 +20,26 @@
#ifndef _THRIFT_SERVER_TTHREADEDSERVER_H_
#define _THRIFT_SERVER_TTHREADEDSERVER_H_ 1
-#include <thrift/concurrency/Monitor.h>
#include <thrift/concurrency/PlatformThreadFactory.h>
#include <thrift/concurrency/Thread.h>
-#include <thrift/server/TServerFramework.h>
+#include <thrift/server/TThreadPoolServer.h>
namespace apache {
namespace thrift {
namespace server {
/**
- * Manage clients using a thread pool.
+ * Manage clients using threads. Once the refactoring for THRIFT-3083 took place it became
+ * obvious that the differences between the two threaded server types was becoming insignificant.
+ * Therefore to satisfy THRIFT-3096 and fix THRIFT-3768, TThreadedServer is simply a wrapper
+ * around TThreadedPoolServer now. If backwards compatibility was not a concern, it would have
+ * been removed.
+ *
+ * The default thread pool size is
*/
-class TThreadedServer : public TServerFramework {
+
+/* [[deprecated]] */
+class TThreadedServer : public TThreadPoolServer {
public:
TThreadedServer(
const boost::shared_ptr<apache::thrift::TProcessorFactory>& processorFactory,
@@ -76,19 +83,10 @@
virtual ~TThreadedServer();
- /**
- * Post-conditions (return guarantees):
- * There will be no clients connected.
- */
- virtual void serve();
-
protected:
virtual void onClientConnected(const boost::shared_ptr<TConnectedClient>& pClient) /* override */;
- virtual void onClientDisconnected(TConnectedClient* pClient) /* override */;
-
- boost::shared_ptr<apache::thrift::concurrency::ThreadFactory> threadFactory_;
- apache::thrift::concurrency::Monitor clientsMonitor_;
};
+
}
}
} // apache::thrift::server