THRIFT-3081 consolidate client processing loop in Simple, Threaded, and Thread Pool servers
diff --git a/lib/cpp/CMakeLists.txt b/lib/cpp/CMakeLists.txt
index a965593..c11fc56 100755
--- a/lib/cpp/CMakeLists.txt
+++ b/lib/cpp/CMakeLists.txt
@@ -54,6 +54,7 @@
src/thrift/transport/TServerSocket.cpp
src/thrift/transport/TTransportUtils.cpp
src/thrift/transport/TBufferTransports.cpp
+ src/thrift/server/TConnectedClient.cpp
src/thrift/server/TServer.cpp
src/thrift/server/TSimpleServer.cpp
src/thrift/server/TThreadPoolServer.cpp
diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am
index e6a6015..cb30bda 100755
--- a/lib/cpp/Makefile.am
+++ b/lib/cpp/Makefile.am
@@ -89,6 +89,7 @@
src/thrift/transport/TSSLServerSocket.cpp \
src/thrift/transport/TTransportUtils.cpp \
src/thrift/transport/TBufferTransports.cpp \
+ src/thrift/server/TConnectedClient.cpp \
src/thrift/server/TServer.cpp \
src/thrift/server/TSimpleServer.cpp \
src/thrift/server/TThreadPoolServer.cpp \
diff --git a/lib/cpp/src/thrift/server/TConnectedClient.cpp b/lib/cpp/src/thrift/server/TConnectedClient.cpp
new file mode 100644
index 0000000..630c28e
--- /dev/null
+++ b/lib/cpp/src/thrift/server/TConnectedClient.cpp
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <thrift/server/TConnectedClient.h>
+
+namespace apache {
+namespace thrift {
+namespace server {
+
+using apache::thrift::TProcessor;
+using apache::thrift::protocol::TProtocol;
+using apache::thrift::server::TServerEventHandler;
+using apache::thrift::transport::TTransport;
+using apache::thrift::transport::TTransportException;
+using boost::shared_ptr;
+using std::string;
+
+TConnectedClient::TConnectedClient(const string& serverType,
+ const shared_ptr<TProcessor>& processor,
+ const shared_ptr<TProtocol>& inputProtocol,
+ const shared_ptr<TProtocol>& outputProtocol,
+ const shared_ptr<TServerEventHandler>& eventHandler,
+ const shared_ptr<TTransport>& client)
+
+ : serverType_(serverType),
+ processor_(processor),
+ inputProtocol_(inputProtocol),
+ outputProtocol_(outputProtocol),
+ eventHandler_(eventHandler),
+ client_(client),
+ opaqueContext_(0) {}
+
+TConnectedClient::~TConnectedClient() {}
+
+void TConnectedClient::run() {
+ if (eventHandler_) {
+ opaqueContext_ = eventHandler_->createContext(inputProtocol_, outputProtocol_);
+ }
+
+ for (;;) {
+ if (eventHandler_) {
+ eventHandler_->processContext(opaqueContext_, client_);
+ }
+
+ try {
+ if (!processor_->process(inputProtocol_, outputProtocol_, opaqueContext_)) {
+ break;
+ }
+ } catch (const TTransportException& ttx) {
+ if (ttx.getType() == TTransportException::TIMED_OUT) {
+ // Receive timeout - continue processing.
+ continue;
+ } else if (ttx.getType() == TTransportException::END_OF_FILE ||
+ ttx.getType() == TTransportException::INTERRUPTED) {
+ // Client disconnected or was interrupted. No logging needed. Done.
+ break;
+ } else {
+ // All other transport exceptions are logged.
+ // State of connection is unknown. Done.
+ string errStr = (serverType_ + " client died: ") + ttx.what();
+ GlobalOutput(errStr.c_str());
+ break;
+ }
+ } catch (const TException& tex) {
+ // Some protocols throw this after they send an error response to the client
+ // They should be trained to return true instead and if they want to log,
+ // then they should log.
+ string errStr = (serverType_ + " processing exception: ") + tex.what();
+ GlobalOutput(errStr.c_str());
+ // Continue processing
+ }
+ }
+
+ cleanup();
+}
+
+void TConnectedClient::cleanup()
+{
+ if (eventHandler_) {
+ eventHandler_->deleteContext(opaqueContext_, inputProtocol_, outputProtocol_);
+ }
+
+ try {
+ inputProtocol_->getTransport()->close();
+ } catch (const TTransportException& ttx) {
+ string errStr = string(serverType_ + " input close failed: ") + ttx.what();
+ GlobalOutput(errStr.c_str());
+ }
+ try {
+ outputProtocol_->getTransport()->close();
+ } catch (const TTransportException& ttx) {
+ string errStr = string(serverType_ + " output close failed: ") + ttx.what();
+ GlobalOutput(errStr.c_str());
+ }
+ try {
+ client_->close();
+ } catch (const TTransportException& ttx) {
+ string errStr = string(serverType_ + " client close failed: ") + ttx.what();
+ GlobalOutput(errStr.c_str());
+ }
+}
+
+}
+}
+} // apache::thrift::server
diff --git a/lib/cpp/src/thrift/server/TConnectedClient.h b/lib/cpp/src/thrift/server/TConnectedClient.h
new file mode 100644
index 0000000..6304398
--- /dev/null
+++ b/lib/cpp/src/thrift/server/TConnectedClient.h
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_SERVER_TCONNECTEDCLIENT_H_
+#define _THRIFT_SERVER_TCONNECTEDCLIENT_H_ 1
+
+#include <boost/shared_ptr.hpp>
+#include <thrift/TProcessor.h>
+#include <thrift/protocol/TProtocol.h>
+#include <thrift/server/TServer.h>
+#include <thrift/transport/TTransport.h>
+
+namespace apache {
+namespace thrift {
+namespace server {
+
+/**
+ * This represents a client connected to a TServer. The
+ * processing loop for a client must provide some required
+ * functionality common to all implementations so it is
+ * encapsulated here.
+ */
+
+class TConnectedClient : public apache::thrift::concurrency::Runnable
+{
+ public:
+ /**
+ * Constructor.
+ *
+ * @param[in] serverType the server type as a string, used
+ * for logging output.
+ * @param[in] processor the TProcessor
+ * @param[in] inputProtocol the input TProtocol
+ * @param[in] outputProtocol the output TProtocol
+ * @param[in] eventHandler the server event handler
+ * @param[in] client the TTransport representing the client
+ */
+ TConnectedClient(
+ const std::string& serverType,
+ const boost::shared_ptr<apache::thrift::TProcessor>& processor,
+ const boost::shared_ptr<apache::thrift::protocol::TProtocol>& inputProtocol,
+ const boost::shared_ptr<apache::thrift::protocol::TProtocol>& outputProtocol,
+ const boost::shared_ptr<apache::thrift::server::TServerEventHandler>& eventHandler,
+ const boost::shared_ptr<apache::thrift::transport::TTransport>& client);
+
+ /**
+ * Destructor.
+ */
+ virtual ~TConnectedClient();
+
+ /**
+ * Drive the client until it is done.
+ * The client processing loop is:
+ *
+ * [optional] call eventHandler->createContext once
+ * [optional] call eventHandler->processContext per request
+ * call processor->process per request
+ * handle expected transport exceptions:
+ * END_OF_FILE means the client is gone
+ * INTERRUPTED means the client was interrupted
+ * by TServerTransport::interruptChildren()
+ * handle unexpected transport exceptions by logging
+ * handle standard exceptions by logging
+ * handle unexpected exceptions by logging
+ * cleanup()
+ */
+ virtual void run() /* override */;
+
+ protected:
+ /**
+ * Cleanup after a client. This happens if the client disconnects,
+ * or if the server is stopped, or if an exception occurs.
+ *
+ * The cleanup processing is:
+ * [optional] call eventHandler->deleteContext once
+ * close the inputProtocol's TTransport
+ * close the outputProtocol's TTransport
+ * close the client
+ */
+ virtual void cleanup();
+
+ private:
+ std::string serverType_;
+ boost::shared_ptr<apache::thrift::TProcessor> processor_;
+ boost::shared_ptr<apache::thrift::protocol::TProtocol> inputProtocol_;
+ boost::shared_ptr<apache::thrift::protocol::TProtocol> outputProtocol_;
+ boost::shared_ptr<apache::thrift::server::TServerEventHandler> eventHandler_;
+ boost::shared_ptr<apache::thrift::transport::TTransport> client_;
+
+ /**
+ * Context acquired from the eventHandler_ if one exists.
+ */
+ void *opaqueContext_;
+};
+
+}
+}
+}
+
+#endif // #ifndef _THRIFT_SERVER_TCONNECTEDCLIENT_H_
diff --git a/lib/cpp/src/thrift/server/TSimpleServer.cpp b/lib/cpp/src/thrift/server/TSimpleServer.cpp
index 19f44ac..b63c45e 100644
--- a/lib/cpp/src/thrift/server/TSimpleServer.cpp
+++ b/lib/cpp/src/thrift/server/TSimpleServer.cpp
@@ -17,6 +17,7 @@
* under the License.
*/
+#include <thrift/server/TConnectedClient.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TTransportException.h>
#include <string>
@@ -103,58 +104,9 @@
break;
}
- // Get the processor
- shared_ptr<TProcessor> processor = getProcessor(inputProtocol, outputProtocol, client);
-
- void* connectionContext = NULL;
- if (eventHandler_) {
- connectionContext = eventHandler_->createContext(inputProtocol, outputProtocol);
- }
- try {
- for (;;) {
- if (eventHandler_) {
- eventHandler_->processContext(connectionContext, client);
- }
- if (!processor->process(inputProtocol, outputProtocol, connectionContext) ||
- // Peek ahead, is the remote side closed?
- !inputProtocol->getTransport()->peek()) {
- break;
- }
- }
- } catch (const TTransportException& ttx) {
- if (ttx.getType() != TTransportException::END_OF_FILE &&
- ttx.getType() != TTransportException::INTERRUPTED)
- {
- string errStr = string("TSimpleServer client died: ") + ttx.what();
- GlobalOutput(errStr.c_str());
- }
- } catch (const std::exception& x) {
- GlobalOutput.printf("TSimpleServer exception: %s: %s", typeid(x).name(), x.what());
- } catch (...) {
- GlobalOutput("TSimpleServer uncaught exception.");
- }
- if (eventHandler_) {
- eventHandler_->deleteContext(connectionContext, inputProtocol, outputProtocol);
- }
-
- try {
- inputTransport->close();
- } catch (const TTransportException& ttx) {
- string errStr = string("TSimpleServer input close failed: ") + ttx.what();
- GlobalOutput(errStr.c_str());
- }
- try {
- outputTransport->close();
- } catch (const TTransportException& ttx) {
- string errStr = string("TSimpleServer output close failed: ") + ttx.what();
- GlobalOutput(errStr.c_str());
- }
- try {
- client->close();
- } catch (const TTransportException& ttx) {
- string errStr = string("TSimpleServer client close failed: ") + ttx.what();
- GlobalOutput(errStr.c_str());
- }
+ TConnectedClient("TSimpleServer",
+ getProcessor(inputProtocol, outputProtocol, client),
+ inputProtocol, outputProtocol, eventHandler_, client).run();
}
if (stop_) {
diff --git a/lib/cpp/src/thrift/server/TSimpleServer.h b/lib/cpp/src/thrift/server/TSimpleServer.h
index 941f12b..7b8677d 100644
--- a/lib/cpp/src/thrift/server/TSimpleServer.h
+++ b/lib/cpp/src/thrift/server/TSimpleServer.h
@@ -94,7 +94,7 @@
void serve();
/**
- * Interrupt serve() so that it meets post-conditions.
+ * Interrupt serve() so that it meets post-conditions and returns.
*/
void stop();
diff --git a/lib/cpp/src/thrift/server/TThreadPoolServer.cpp b/lib/cpp/src/thrift/server/TThreadPoolServer.cpp
index 58cfe3e..f8ed6cf 100644
--- a/lib/cpp/src/thrift/server/TThreadPoolServer.cpp
+++ b/lib/cpp/src/thrift/server/TThreadPoolServer.cpp
@@ -19,12 +19,14 @@
#include <thrift/thrift-config.h>
+#include <thrift/server/TConnectedClient.h>
#include <thrift/server/TThreadPoolServer.h>
#include <thrift/transport/TTransportException.h>
#include <thrift/concurrency/Thread.h>
#include <thrift/concurrency/ThreadManager.h>
#include <string>
#include <iostream>
+#include <boost/make_shared.hpp>
namespace apache {
namespace thrift {
@@ -37,78 +39,6 @@
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
-class TThreadPoolServer::Task : public Runnable {
-
-public:
- Task(TThreadPoolServer& server,
- shared_ptr<TProcessor> processor,
- shared_ptr<TProtocol> input,
- shared_ptr<TProtocol> output,
- shared_ptr<TTransport> transport)
- : server_(server),
- processor_(processor),
- input_(input),
- output_(output),
- transport_(transport) {}
-
- ~Task() {}
-
- void run() {
- boost::shared_ptr<TServerEventHandler> eventHandler = server_.getEventHandler();
- void* connectionContext = NULL;
- if (eventHandler) {
- connectionContext = eventHandler->createContext(input_, output_);
- }
- try {
- for (;;) {
- if (eventHandler) {
- eventHandler->processContext(connectionContext, transport_);
- }
- if (!processor_->process(input_, output_, connectionContext)
- || !input_->getTransport()->peek()) {
- break;
- }
- }
- } catch (const TTransportException& ttx) {
- if (ttx.getType() != TTransportException::END_OF_FILE &&
- ttx.getType() != TTransportException::INTERRUPTED) {
- string errStr = string("TThreadPoolServer::Task client died: ") + ttx.what();
- GlobalOutput(errStr.c_str());
- }
- } catch (const std::exception& x) {
- GlobalOutput.printf("TThreadPoolServer exception %s: %s", typeid(x).name(), x.what());
- } catch (...) {
- GlobalOutput(
- "TThreadPoolServer, unexpected exception in "
- "TThreadPoolServer::Task::run()");
- }
-
- if (eventHandler) {
- eventHandler->deleteContext(connectionContext, input_, output_);
- }
-
- try {
- input_->getTransport()->close();
- } catch (TTransportException& ttx) {
- string errStr = string("TThreadPoolServer input close failed: ") + ttx.what();
- GlobalOutput(errStr.c_str());
- }
- try {
- output_->getTransport()->close();
- } catch (TTransportException& ttx) {
- string errStr = string("TThreadPoolServer output close failed: ") + ttx.what();
- GlobalOutput(errStr.c_str());
- }
- }
-
-private:
- TServer& server_;
- shared_ptr<TProcessor> processor_;
- shared_ptr<TProtocol> input_;
- shared_ptr<TProtocol> output_;
- shared_ptr<TTransport> transport_;
-};
-
TThreadPoolServer::~TThreadPoolServer() {}
void TThreadPoolServer::serve() {
@@ -146,9 +76,13 @@
shared_ptr<TProcessor> processor = getProcessor(inputProtocol, outputProtocol, client);
// Add to threadmanager pool
- shared_ptr<TThreadPoolServer::Task> task(
- new TThreadPoolServer::Task(*this, processor, inputProtocol, outputProtocol, client));
- threadManager_->add(task, timeout_, taskExpiration_);
+ threadManager_->add(
+ boost::make_shared<TConnectedClient>(
+ "TThreadPoolServer",
+ getProcessor(inputProtocol, outputProtocol, client),
+ inputProtocol, outputProtocol, eventHandler_, client),
+ timeout_,
+ taskExpiration_);
} catch (TTransportException& ttx) {
if (inputTransport) {
diff --git a/lib/cpp/src/thrift/server/TThreadPoolServer.h b/lib/cpp/src/thrift/server/TThreadPoolServer.h
index 1696700..2f93463 100644
--- a/lib/cpp/src/thrift/server/TThreadPoolServer.h
+++ b/lib/cpp/src/thrift/server/TThreadPoolServer.h
@@ -37,8 +37,6 @@
class TThreadPoolServer : public TServer {
public:
- class Task;
-
template <typename ProcessorFactory>
TThreadPoolServer(const boost::shared_ptr<ProcessorFactory>& processorFactory,
const boost::shared_ptr<TServerTransport>& serverTransport,
@@ -107,8 +105,19 @@
virtual ~TThreadPoolServer();
+ /**
+ * Process all connections that arrive using a thread pool.
+ * Call stop() on another thread to interrupt processing and
+ * return control to the caller.
+ * Post-conditions (return guarantees):
+ * The serverTransport will be closed.
+ * There will be no connected clients.
+ */
virtual void serve();
+ /**
+ * Interrupt serve() so that it meets post-conditions and returns.
+ */
virtual void stop();
virtual int64_t getTimeout() const;
diff --git a/lib/cpp/src/thrift/server/TThreadedServer.cpp b/lib/cpp/src/thrift/server/TThreadedServer.cpp
index 118c9cb..4dcdb44 100644
--- a/lib/cpp/src/thrift/server/TThreadedServer.cpp
+++ b/lib/cpp/src/thrift/server/TThreadedServer.cpp
@@ -17,6 +17,8 @@
* under the License.
*/
+#include <boost/bind.hpp>
+#include <thrift/server/TConnectedClient.h>
#include <thrift/server/TThreadedServer.h>
#include <thrift/transport/TTransportException.h>
#include <thrift/concurrency/PlatformThreadFactory.h>
@@ -39,94 +41,6 @@
using namespace apache::thrift::transport;
using namespace apache::thrift::concurrency;
-class TThreadedServer::Task : public Runnable {
-
-public:
- Task(TThreadedServer& server,
- shared_ptr<TProcessor> processor,
- shared_ptr<TProtocol> input,
- shared_ptr<TProtocol> output,
- shared_ptr<TTransport> transport)
- : server_(server),
- processor_(processor),
- input_(input),
- output_(output),
- transport_(transport) {}
-
- ~Task() {}
-
- void run() {
- boost::shared_ptr<TServerEventHandler> eventHandler = server_.getEventHandler();
- void* connectionContext = NULL;
- if (eventHandler) {
- connectionContext = eventHandler->createContext(input_, output_);
- }
- try {
- for (;;) {
- if (eventHandler) {
- eventHandler->processContext(connectionContext, transport_);
- }
- if (!processor_->process(input_, output_, connectionContext)
- || !input_->getTransport()->peek()) {
- break;
- }
- }
- } catch (const TTransportException& ttx) {
- if (ttx.getType() != TTransportException::END_OF_FILE &&
- ttx.getType() != TTransportException::INTERRUPTED) {
- string errStr = string("TThreadedServer client died: ") + ttx.what();
- GlobalOutput(errStr.c_str());
- }
- } catch (const std::exception& x) {
- GlobalOutput.printf("TThreadedServer exception: %s: %s", typeid(x).name(), x.what());
- } catch (...) {
- GlobalOutput("TThreadedServer uncaught exception.");
- }
- if (eventHandler) {
- eventHandler->deleteContext(connectionContext, input_, output_);
- }
-
- try {
- input_->getTransport()->close();
- } catch (TTransportException& ttx) {
- string errStr = string("TThreadedServer input close failed: ") + ttx.what();
- GlobalOutput(errStr.c_str());
- }
- try {
- output_->getTransport()->close();
- } catch (TTransportException& ttx) {
- string errStr = string("TThreadedServer output close failed: ") + ttx.what();
- GlobalOutput(errStr.c_str());
- }
-
- // Remove this task from parent bookkeeping
- {
- Synchronized s(server_.tasksMonitor_);
- server_.tasks_.erase(this);
- if (server_.tasks_.empty()) {
- server_.tasksMonitor_.notify();
- }
- }
- }
-
-private:
- TThreadedServer& server_;
- friend class TThreadedServer;
-
- shared_ptr<TProcessor> processor_;
- shared_ptr<TProtocol> input_;
- shared_ptr<TProtocol> output_;
- shared_ptr<TTransport> transport_;
-};
-
-void TThreadedServer::init() {
- stop_ = false;
-
- if (!threadFactory_) {
- threadFactory_.reset(new PlatformThreadFactory);
- }
-}
-
TThreadedServer::~TThreadedServer() {}
void TThreadedServer::serve() {
@@ -162,21 +76,19 @@
inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
- shared_ptr<TProcessor> processor = getProcessor(inputProtocol, outputProtocol, client);
+ shared_ptr<TConnectedClient> pClient(
+ new TConnectedClient("TThreadedServer",
+ getProcessor(inputProtocol, outputProtocol, client),
+ inputProtocol, outputProtocol, eventHandler_, client),
+ boost::bind(&TThreadedServer::disposeClient, this, _1));
- TThreadedServer::Task* task
- = new TThreadedServer::Task(*this, processor, inputProtocol, outputProtocol, client);
-
- // Create a task
- shared_ptr<Runnable> runnable = shared_ptr<Runnable>(task);
-
- // Create a thread for this task
- shared_ptr<Thread> thread = shared_ptr<Thread>(threadFactory_->newThread(runnable));
+ // Create a thread for this client
+ shared_ptr<Thread> thread = shared_ptr<Thread>(threadFactory_->newThread(pClient));
// Insert thread into the set of threads
{
- Synchronized s(tasksMonitor_);
- tasks_.insert(task);
+ Synchronized s(clientsMonitor_);
+ clients_.insert(pClient.get());
}
// Start the thread!
@@ -235,9 +147,9 @@
GlobalOutput(errStr.c_str());
}
try {
- Synchronized s(tasksMonitor_);
- while (!tasks_.empty()) {
- tasksMonitor_.wait();
+ Synchronized s(clientsMonitor_);
+ while (!clients_.empty()) {
+ clientsMonitor_.wait();
}
} catch (TException& tx) {
string errStr = string("TThreadedServer: Exception joining workers: ") + tx.what();
@@ -254,6 +166,19 @@
serverTransport_->interruptChildren();
}
}
+
+void TThreadedServer::disposeClient(TConnectedClient *pClient) {
+ // Remove this task from parent bookkeeping
+ {
+ Synchronized s(clientsMonitor_);
+ clients_.erase(pClient);
+ if (clients_.empty()) {
+ clientsMonitor_.notify();
+ }
+ }
+ delete pClient;
+}
+
}
}
} // apache::thrift::server
diff --git a/lib/cpp/src/thrift/server/TThreadedServer.h b/lib/cpp/src/thrift/server/TThreadedServer.h
index b9b24fe..5d510d6 100644
--- a/lib/cpp/src/thrift/server/TThreadedServer.h
+++ b/lib/cpp/src/thrift/server/TThreadedServer.h
@@ -20,9 +20,11 @@
#ifndef _THRIFT_SERVER_TTHREADEDSERVER_H_
#define _THRIFT_SERVER_TTHREADEDSERVER_H_ 1
+#include <set>
#include <thrift/server/TServer.h>
#include <thrift/transport/TServerTransport.h>
#include <thrift/concurrency/Monitor.h>
+#include <thrift/concurrency/PlatformThreadFactory.h>
#include <thrift/concurrency/Thread.h>
#include <boost/shared_ptr.hpp>
@@ -35,19 +37,22 @@
using apache::thrift::transport::TServerTransport;
using apache::thrift::transport::TTransportFactory;
using apache::thrift::concurrency::Monitor;
+using apache::thrift::concurrency::PlatformThreadFactory;
using apache::thrift::concurrency::ThreadFactory;
+class TConnectedClient;
+
class TThreadedServer : public TServer {
-
public:
- class Task;
-
template <typename ProcessorFactory>
TThreadedServer(const boost::shared_ptr<ProcessorFactory>& processorFactory,
const boost::shared_ptr<TServerTransport>& serverTransport,
const boost::shared_ptr<TTransportFactory>& transportFactory,
const boost::shared_ptr<TProtocolFactory>& protocolFactory,
- THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory));
+ THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory))
+ : TServer(processorFactory, serverTransport, transportFactory, protocolFactory),
+ threadFactory_(new PlatformThreadFactory),
+ stop_(false) {}
template <typename ProcessorFactory>
TThreadedServer(const boost::shared_ptr<ProcessorFactory>& processorFactory,
@@ -55,14 +60,20 @@
const boost::shared_ptr<TTransportFactory>& transportFactory,
const boost::shared_ptr<TProtocolFactory>& protocolFactory,
const boost::shared_ptr<ThreadFactory>& threadFactory,
- THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory));
+ THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory))
+ : TServer(processorFactory, serverTransport, transportFactory, protocolFactory),
+ threadFactory_(threadFactory),
+ stop_(false) {}
template <typename Processor>
TThreadedServer(const boost::shared_ptr<Processor>& processor,
const boost::shared_ptr<TServerTransport>& serverTransport,
const boost::shared_ptr<TTransportFactory>& transportFactory,
const boost::shared_ptr<TProtocolFactory>& protocolFactory,
- THRIFT_OVERLOAD_IF(Processor, TProcessor));
+ THRIFT_OVERLOAD_IF(Processor, TProcessor))
+ : TServer(processor, serverTransport, transportFactory, protocolFactory),
+ threadFactory_(new PlatformThreadFactory),
+ stop_(false) {}
template <typename Processor>
TThreadedServer(const boost::shared_ptr<Processor>& processor,
@@ -70,66 +81,43 @@
const boost::shared_ptr<TTransportFactory>& transportFactory,
const boost::shared_ptr<TProtocolFactory>& protocolFactory,
const boost::shared_ptr<ThreadFactory>& threadFactory,
- THRIFT_OVERLOAD_IF(Processor, TProcessor));
+ THRIFT_OVERLOAD_IF(Processor, TProcessor))
+ : TServer(processor, serverTransport, transportFactory, protocolFactory),
+ threadFactory_(threadFactory),
+ stop_(false) {}
virtual ~TThreadedServer();
+ /**
+ * Process all connections that arrive, each on their own
+ * dedicated thread. There is no limit to the number of
+ * threads or connections (see THRIFT-3084).
+ * Call stop() on another thread to interrupt processing and
+ * return control to the caller.
+ * Post-conditions (return guarantees):
+ * The serverTransport will be closed.
+ * There will be no connected clients.
+ */
virtual void serve();
- void stop();
+
+ /**
+ * Interrupt serve() so that it meets post-conditions and returns.
+ */
+ virtual void stop();
protected:
- void init();
+ /**
+ * Smart pointer release method
+ */
+ virtual void disposeClient(TConnectedClient *pClient);
boost::shared_ptr<ThreadFactory> threadFactory_;
volatile bool stop_;
- Monitor tasksMonitor_;
- std::set<Task*> tasks_;
+ Monitor clientsMonitor_;
+ std::set<TConnectedClient*> clients_;
};
-template <typename ProcessorFactory>
-TThreadedServer::TThreadedServer(const boost::shared_ptr<ProcessorFactory>& processorFactory,
- const boost::shared_ptr<TServerTransport>& serverTransport,
- const boost::shared_ptr<TTransportFactory>& transportFactory,
- const boost::shared_ptr<TProtocolFactory>& protocolFactory,
- THRIFT_OVERLOAD_IF_DEFN(ProcessorFactory, TProcessorFactory))
- : TServer(processorFactory, serverTransport, transportFactory, protocolFactory) {
- init();
-}
-
-template <typename ProcessorFactory>
-TThreadedServer::TThreadedServer(const boost::shared_ptr<ProcessorFactory>& processorFactory,
- const boost::shared_ptr<TServerTransport>& serverTransport,
- const boost::shared_ptr<TTransportFactory>& transportFactory,
- const boost::shared_ptr<TProtocolFactory>& protocolFactory,
- const boost::shared_ptr<ThreadFactory>& threadFactory,
- THRIFT_OVERLOAD_IF_DEFN(ProcessorFactory, TProcessorFactory))
- : TServer(processorFactory, serverTransport, transportFactory, protocolFactory),
- threadFactory_(threadFactory) {
- init();
-}
-
-template <typename Processor>
-TThreadedServer::TThreadedServer(const boost::shared_ptr<Processor>& processor,
- const boost::shared_ptr<TServerTransport>& serverTransport,
- const boost::shared_ptr<TTransportFactory>& transportFactory,
- const boost::shared_ptr<TProtocolFactory>& protocolFactory,
- THRIFT_OVERLOAD_IF_DEFN(Processor, TProcessor))
- : TServer(processor, serverTransport, transportFactory, protocolFactory) {
- init();
-}
-
-template <typename Processor>
-TThreadedServer::TThreadedServer(const boost::shared_ptr<Processor>& processor,
- const boost::shared_ptr<TServerTransport>& serverTransport,
- const boost::shared_ptr<TTransportFactory>& transportFactory,
- const boost::shared_ptr<TProtocolFactory>& protocolFactory,
- const boost::shared_ptr<ThreadFactory>& threadFactory,
- THRIFT_OVERLOAD_IF_DEFN(Processor, TProcessor))
- : TServer(processor, serverTransport, transportFactory, protocolFactory),
- threadFactory_(threadFactory) {
- init();
-}
}
}
} // apache::thrift::server