Revert "THRIFT-3768 fix TThreadedServer refactoring issues with client lifetime guarantees"
This reverts commit 0b433de5d5c7454f5410ac7b3d1ac86a07d1beef.
diff --git a/lib/cpp/src/thrift/concurrency/ThreadManager.cpp b/lib/cpp/src/thrift/concurrency/ThreadManager.cpp
index 8f74fa6..24bfeec 100644
--- a/lib/cpp/src/thrift/concurrency/ThreadManager.cpp
+++ b/lib/cpp/src/thrift/concurrency/ThreadManager.cpp
@@ -544,6 +544,7 @@
private:
const size_t workerCount_;
const size_t pendingTaskCountMax_;
+ Monitor monitor_;
};
shared_ptr<ThreadManager> ThreadManager::newThreadManager() {
diff --git a/lib/cpp/src/thrift/server/TServerFramework.cpp b/lib/cpp/src/thrift/server/TServerFramework.cpp
index ea78bcd..56b6cca 100644
--- a/lib/cpp/src/thrift/server/TServerFramework.cpp
+++ b/lib/cpp/src/thrift/server/TServerFramework.cpp
@@ -221,27 +221,25 @@
}
void TServerFramework::newlyConnectedClient(const boost::shared_ptr<TConnectedClient>& pClient) {
- {
- // Count a concurrent client added.
- Synchronized sync(mon_);
- ++clients_;
- hwm_ = (std::max)(hwm_, clients_);
- }
-
onClientConnected(pClient);
+
+ // Count a concurrent client added.
+ Synchronized sync(mon_);
+ ++clients_;
+ hwm_ = (std::max)(hwm_, clients_);
}
void TServerFramework::disposeConnectedClient(TConnectedClient* pClient) {
+ {
+ // Count a concurrent client removed.
+ Synchronized sync(mon_);
+ if (limit_ - --clients_ > 0) {
+ mon_.notify();
+ }
+ }
onClientDisconnected(pClient);
delete pClient;
-
- // Count a concurrent client removed.
- Synchronized sync(mon_);
- if (limit_ - --clients_ > 0) {
- mon_.notify();
- }
}
-
}
}
} // apache::thrift::server
diff --git a/lib/cpp/src/thrift/server/TThreadedServer.cpp b/lib/cpp/src/thrift/server/TThreadedServer.cpp
index 36d1fc4..92f5cf8 100644
--- a/lib/cpp/src/thrift/server/TThreadedServer.cpp
+++ b/lib/cpp/src/thrift/server/TThreadedServer.cpp
@@ -41,10 +41,8 @@
const shared_ptr<TTransportFactory>& transportFactory,
const shared_ptr<TProtocolFactory>& protocolFactory,
const shared_ptr<ThreadFactory>& threadFactory)
- : TThreadPoolServer(processorFactory, serverTransport, transportFactory, protocolFactory,
- apache::thrift::concurrency::ThreadManager::newSimpleThreadManager(0, 0)) {
- threadManager_->threadFactory(threadFactory);
- threadManager_->start();
+ : TServerFramework(processorFactory, serverTransport, transportFactory, protocolFactory),
+ threadFactory_(threadFactory) {
}
TThreadedServer::TThreadedServer(const shared_ptr<TProcessor>& processor,
@@ -52,10 +50,8 @@
const shared_ptr<TTransportFactory>& transportFactory,
const shared_ptr<TProtocolFactory>& protocolFactory,
const shared_ptr<ThreadFactory>& threadFactory)
- : TThreadPoolServer(processor, serverTransport, transportFactory, protocolFactory,
- apache::thrift::concurrency::ThreadManager::newSimpleThreadManager(0, 0)) {
- threadManager_->threadFactory(threadFactory);
- threadManager_->start();
+ : TServerFramework(processor, serverTransport, transportFactory, protocolFactory),
+ threadFactory_(threadFactory) {
}
TThreadedServer::TThreadedServer(const shared_ptr<TProcessorFactory>& processorFactory,
@@ -65,15 +61,13 @@
const shared_ptr<TProtocolFactory>& inputProtocolFactory,
const shared_ptr<TProtocolFactory>& outputProtocolFactory,
const shared_ptr<ThreadFactory>& threadFactory)
- : TThreadPoolServer(processorFactory,
- serverTransport,
- inputTransportFactory,
- outputTransportFactory,
- inputProtocolFactory,
- outputProtocolFactory,
- apache::thrift::concurrency::ThreadManager::newSimpleThreadManager(0, 0)) {
- threadManager_->threadFactory(threadFactory);
- threadManager_->start();
+ : TServerFramework(processorFactory,
+ serverTransport,
+ inputTransportFactory,
+ outputTransportFactory,
+ inputProtocolFactory,
+ outputProtocolFactory),
+ threadFactory_(threadFactory) {
}
TThreadedServer::TThreadedServer(const shared_ptr<TProcessor>& processor,
@@ -83,29 +77,44 @@
const shared_ptr<TProtocolFactory>& inputProtocolFactory,
const shared_ptr<TProtocolFactory>& outputProtocolFactory,
const shared_ptr<ThreadFactory>& threadFactory)
- : TThreadPoolServer(processor,
- serverTransport,
- inputTransportFactory,
- outputTransportFactory,
- inputProtocolFactory,
- outputProtocolFactory,
- apache::thrift::concurrency::ThreadManager::newSimpleThreadManager(0, 0)) {
- threadManager_->threadFactory(threadFactory);
- threadManager_->start();
+ : TServerFramework(processor,
+ serverTransport,
+ inputTransportFactory,
+ outputTransportFactory,
+ inputProtocolFactory,
+ outputProtocolFactory),
+ threadFactory_(threadFactory) {
}
TThreadedServer::~TThreadedServer() {
}
-void TThreadedServer::onClientConnected(const shared_ptr<TConnectedClient>& pClient) {
- if (!threadManager_->idleWorkerCount())
- {
- threadManager_->addWorker();
- }
+void TThreadedServer::serve() {
+ TServerFramework::serve();
- TThreadPoolServer::onClientConnected(pClient);
+ // Drain all clients - no more will arrive
+ try {
+ Synchronized s(clientsMonitor_);
+ while (getConcurrentClientCount() > 0) {
+ clientsMonitor_.wait();
+ }
+ } catch (TException& tx) {
+ string errStr = string("TThreadedServer: Exception joining workers: ") + tx.what();
+ GlobalOutput(errStr.c_str());
+ }
}
+void TThreadedServer::onClientConnected(const shared_ptr<TConnectedClient>& pClient) {
+ threadFactory_->newThread(pClient)->start();
+}
+
+void TThreadedServer::onClientDisconnected(TConnectedClient* pClient) {
+ THRIFT_UNUSED_VARIABLE(pClient);
+ Synchronized s(clientsMonitor_);
+ if (getConcurrentClientCount() == 0) {
+ clientsMonitor_.notify();
+ }
+}
}
}
} // apache::thrift::server
diff --git a/lib/cpp/src/thrift/server/TThreadedServer.h b/lib/cpp/src/thrift/server/TThreadedServer.h
index a5b005a..cdacfd7 100644
--- a/lib/cpp/src/thrift/server/TThreadedServer.h
+++ b/lib/cpp/src/thrift/server/TThreadedServer.h
@@ -20,26 +20,19 @@
#ifndef _THRIFT_SERVER_TTHREADEDSERVER_H_
#define _THRIFT_SERVER_TTHREADEDSERVER_H_ 1
+#include <thrift/concurrency/Monitor.h>
#include <thrift/concurrency/PlatformThreadFactory.h>
#include <thrift/concurrency/Thread.h>
-#include <thrift/server/TThreadPoolServer.h>
+#include <thrift/server/TServerFramework.h>
namespace apache {
namespace thrift {
namespace server {
/**
- * Manage clients using threads. Once the refactoring for THRIFT-3083 took place it became
- * obvious that the differences between the two threaded server types was becoming insignificant.
- * Therefore to satisfy THRIFT-3096 and fix THRIFT-3768, TThreadedServer is simply a wrapper
- * around TThreadedPoolServer now. If backwards compatibility was not a concern, it would have
- * been removed.
- *
- * The default thread pool size is
+ * Manage clients using a thread pool.
*/
-
-/* [[deprecated]] */
-class TThreadedServer : public TThreadPoolServer {
+class TThreadedServer : public TServerFramework {
public:
TThreadedServer(
const boost::shared_ptr<apache::thrift::TProcessorFactory>& processorFactory,
@@ -83,10 +76,19 @@
virtual ~TThreadedServer();
+ /**
+ * Post-conditions (return guarantees):
+ * There will be no clients connected.
+ */
+ virtual void serve();
+
protected:
virtual void onClientConnected(const boost::shared_ptr<TConnectedClient>& pClient) /* override */;
-};
+ virtual void onClientDisconnected(TConnectedClient* pClient) /* override */;
+ boost::shared_ptr<apache::thrift::concurrency::ThreadFactory> threadFactory_;
+ apache::thrift::concurrency::Monitor clientsMonitor_;
+};
}
}
} // apache::thrift::server