THRIFT-3083 consolidate simple and threaded server run loops
diff --git a/lib/cpp/CMakeLists.txt b/lib/cpp/CMakeLists.txt
index c11fc56..8ea0546 100755
--- a/lib/cpp/CMakeLists.txt
+++ b/lib/cpp/CMakeLists.txt
@@ -56,6 +56,7 @@
src/thrift/transport/TBufferTransports.cpp
src/thrift/server/TConnectedClient.cpp
src/thrift/server/TServer.cpp
+ src/thrift/server/TServerFramework.cpp
src/thrift/server/TSimpleServer.cpp
src/thrift/server/TThreadPoolServer.cpp
src/thrift/server/TThreadedServer.cpp
diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am
index cb30bda..28ff7c8 100755
--- a/lib/cpp/Makefile.am
+++ b/lib/cpp/Makefile.am
@@ -91,6 +91,7 @@
src/thrift/transport/TBufferTransports.cpp \
src/thrift/server/TConnectedClient.cpp \
src/thrift/server/TServer.cpp \
+ src/thrift/server/TServerFramework.cpp \
src/thrift/server/TSimpleServer.cpp \
src/thrift/server/TThreadPoolServer.cpp \
src/thrift/server/TThreadedServer.cpp \
diff --git a/lib/cpp/src/thrift/server/TConnectedClient.cpp b/lib/cpp/src/thrift/server/TConnectedClient.cpp
index 630c28e..86a81e2 100644
--- a/lib/cpp/src/thrift/server/TConnectedClient.cpp
+++ b/lib/cpp/src/thrift/server/TConnectedClient.cpp
@@ -31,15 +31,13 @@
using boost::shared_ptr;
using std::string;
-TConnectedClient::TConnectedClient(const string& serverType,
- const shared_ptr<TProcessor>& processor,
+TConnectedClient::TConnectedClient(const shared_ptr<TProcessor>& processor,
const shared_ptr<TProtocol>& inputProtocol,
const shared_ptr<TProtocol>& outputProtocol,
const shared_ptr<TServerEventHandler>& eventHandler,
const shared_ptr<TTransport>& client)
- : serverType_(serverType),
- processor_(processor),
+ : processor_(processor),
inputProtocol_(inputProtocol),
outputProtocol_(outputProtocol),
eventHandler_(eventHandler),
@@ -53,7 +51,7 @@
opaqueContext_ = eventHandler_->createContext(inputProtocol_, outputProtocol_);
}
- for (;;) {
+ for (bool done = false; !done; ) {
if (eventHandler_) {
eventHandler_->processContext(opaqueContext_, client_);
}
@@ -63,25 +61,30 @@
break;
}
} catch (const TTransportException& ttx) {
- if (ttx.getType() == TTransportException::TIMED_OUT) {
- // Receive timeout - continue processing.
- continue;
- } else if (ttx.getType() == TTransportException::END_OF_FILE ||
- ttx.getType() == TTransportException::INTERRUPTED) {
- // Client disconnected or was interrupted. No logging needed. Done.
- break;
- } else {
- // All other transport exceptions are logged.
- // State of connection is unknown. Done.
- string errStr = (serverType_ + " client died: ") + ttx.what();
- GlobalOutput(errStr.c_str());
- break;
+ switch (ttx.getType())
+ {
+ case TTransportException::TIMED_OUT:
+ // Receive timeout - continue processing.
+ continue;
+
+ case TTransportException::END_OF_FILE:
+ case TTransportException::INTERRUPTED:
+ // Client disconnected or was interrupted. No logging needed. Done.
+ done = true;
+ break;
+
+ default:
+ {
+ // All other transport exceptions are logged.
+ // State of connection is unknown. Done.
+ string errStr = string("TConnectedClient died: ") + ttx.what();
+ GlobalOutput(errStr.c_str());
+ done = true;
+ break;
+ }
}
} catch (const TException& tex) {
- // Some protocols throw this after they send an error response to the client
- // They should be trained to return true instead and if they want to log,
- // then they should log.
- string errStr = (serverType_ + " processing exception: ") + tex.what();
+ string errStr = string("TConnectedClient processing exception: ") + tex.what();
GlobalOutput(errStr.c_str());
// Continue processing
}
@@ -99,19 +102,21 @@
try {
inputProtocol_->getTransport()->close();
} catch (const TTransportException& ttx) {
- string errStr = string(serverType_ + " input close failed: ") + ttx.what();
+ string errStr = string("TConnectedClient input close failed: ") + ttx.what();
GlobalOutput(errStr.c_str());
}
+
try {
outputProtocol_->getTransport()->close();
} catch (const TTransportException& ttx) {
- string errStr = string(serverType_ + " output close failed: ") + ttx.what();
+ string errStr = string("TConnectedClient output close failed: ") + ttx.what();
GlobalOutput(errStr.c_str());
}
+
try {
client_->close();
} catch (const TTransportException& ttx) {
- string errStr = string(serverType_ + " client close failed: ") + ttx.what();
+ string errStr = string("TConnectedClient client close failed: ") + ttx.what();
GlobalOutput(errStr.c_str());
}
}
diff --git a/lib/cpp/src/thrift/server/TConnectedClient.h b/lib/cpp/src/thrift/server/TConnectedClient.h
index 6304398..8931335 100644
--- a/lib/cpp/src/thrift/server/TConnectedClient.h
+++ b/lib/cpp/src/thrift/server/TConnectedClient.h
@@ -43,8 +43,6 @@
/**
* Constructor.
*
- * @param[in] serverType the server type as a string, used
- * for logging output.
* @param[in] processor the TProcessor
* @param[in] inputProtocol the input TProtocol
* @param[in] outputProtocol the output TProtocol
@@ -52,7 +50,6 @@
* @param[in] client the TTransport representing the client
*/
TConnectedClient(
- const std::string& serverType,
const boost::shared_ptr<apache::thrift::TProcessor>& processor,
const boost::shared_ptr<apache::thrift::protocol::TProtocol>& inputProtocol,
const boost::shared_ptr<apache::thrift::protocol::TProtocol>& outputProtocol,
@@ -96,7 +93,6 @@
virtual void cleanup();
private:
- std::string serverType_;
boost::shared_ptr<apache::thrift::TProcessor> processor_;
boost::shared_ptr<apache::thrift::protocol::TProtocol> inputProtocol_;
boost::shared_ptr<apache::thrift::protocol::TProtocol> outputProtocol_;
diff --git a/lib/cpp/src/thrift/server/TServerFramework.cpp b/lib/cpp/src/thrift/server/TServerFramework.cpp
new file mode 100644
index 0000000..8adb29a
--- /dev/null
+++ b/lib/cpp/src/thrift/server/TServerFramework.cpp
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <boost/bind.hpp>
+#include <thrift/server/TServerFramework.h>
+
+namespace apache {
+namespace thrift {
+namespace server {
+
+using apache::thrift::transport::TServerTransport;
+using apache::thrift::transport::TTransport;
+using apache::thrift::transport::TTransportException;
+using apache::thrift::transport::TTransportFactory;
+using apache::thrift::protocol::TProtocol;
+using apache::thrift::protocol::TProtocolFactory;
+using boost::bind;
+using boost::shared_ptr;
+using std::string;
+
+TServerFramework::TServerFramework(
+ const shared_ptr<TProcessorFactory>& processorFactory,
+ const shared_ptr<TServerTransport>& serverTransport,
+ const shared_ptr<TTransportFactory>& transportFactory,
+ const shared_ptr<TProtocolFactory>& protocolFactory)
+ : TServer(processorFactory, serverTransport, transportFactory, protocolFactory) {}
+
+TServerFramework::TServerFramework(
+ const shared_ptr<TProcessor>& processor,
+ const shared_ptr<TServerTransport>& serverTransport,
+ const shared_ptr<TTransportFactory>& transportFactory,
+ const shared_ptr<TProtocolFactory>& protocolFactory)
+ : TServer(processor, serverTransport, transportFactory, protocolFactory) {}
+
+TServerFramework::TServerFramework(
+ const shared_ptr<TProcessorFactory>& processorFactory,
+ const shared_ptr<TServerTransport>& serverTransport,
+ const shared_ptr<TTransportFactory>& inputTransportFactory,
+ const shared_ptr<TTransportFactory>& outputTransportFactory,
+ const shared_ptr<TProtocolFactory>& inputProtocolFactory,
+ const shared_ptr<TProtocolFactory>& outputProtocolFactory)
+ : TServer(processorFactory, serverTransport,
+ inputTransportFactory, outputTransportFactory,
+ inputProtocolFactory, outputProtocolFactory) {}
+
+TServerFramework::TServerFramework(
+ const shared_ptr<TProcessor>& processor,
+ const shared_ptr<TServerTransport>& serverTransport,
+ const shared_ptr<TTransportFactory>& inputTransportFactory,
+ const shared_ptr<TTransportFactory>& outputTransportFactory,
+ const shared_ptr<TProtocolFactory>& inputProtocolFactory,
+ const shared_ptr<TProtocolFactory>& outputProtocolFactory)
+ : TServer(processor, serverTransport,
+ inputTransportFactory, outputTransportFactory,
+ inputProtocolFactory, outputProtocolFactory) {}
+
+TServerFramework::~TServerFramework() {}
+
+template<typename T>
+static void releaseOneDescriptor(const string& name, T& pTransport) {
+ if (pTransport) {
+ try {
+ pTransport->close();
+ } catch (const TTransportException& ttx) {
+ string errStr = string("TServerFramework " + name + " close failed: ") + ttx.what();
+ GlobalOutput(errStr.c_str());
+ }
+ }
+}
+
+void TServerFramework::serve() {
+ shared_ptr<TTransport> client;
+ shared_ptr<TTransport> inputTransport;
+ shared_ptr<TTransport> outputTransport;
+ shared_ptr<TProtocol> inputProtocol;
+ shared_ptr<TProtocol> outputProtocol;
+
+ // Start the server listening
+ serverTransport_->listen();
+
+ // Run the preServe event to indicate server is now listening
+ // and that it is safe to connect.
+ if (eventHandler_) {
+ eventHandler_->preServe();
+ }
+
+ // Fetch client from server
+ for (;;) {
+ try {
+ // Dereference any resources from any previous client creation
+ // such that a blocking accept does not hold them indefinitely.
+ outputProtocol.reset();
+ inputProtocol.reset();
+ outputTransport.reset();
+ inputTransport.reset();
+ client.reset();
+
+ client = serverTransport_->accept();
+
+ inputTransport = inputTransportFactory_->getTransport(client);
+ outputTransport = outputTransportFactory_->getTransport(client);
+ inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
+ outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
+
+ onClientConnected(
+ shared_ptr<TConnectedClient>(
+ new TConnectedClient(getProcessor(inputProtocol, outputProtocol, client),
+ inputProtocol, outputProtocol, eventHandler_, client),
+ bind(&TServerFramework::disposeConnectedClient, this, _1)));
+ } catch (TTransportException& ttx) {
+ releaseOneDescriptor("inputTransport", inputTransport);
+ releaseOneDescriptor("outputTransport", outputTransport);
+ releaseOneDescriptor("client", client);
+ if (ttx.getType() == TTransportException::TIMED_OUT) {
+ // Accept timeout - continue processing.
+ continue;
+ } else if (ttx.getType() == TTransportException::END_OF_FILE ||
+ ttx.getType() == TTransportException::INTERRUPTED) {
+ // Server was interrupted. This only happens when stopping.
+ break;
+ } else {
+ // All other transport exceptions are logged.
+ // State of connection is unknown. Done.
+ string errStr = string("TServerTransport died: ") + ttx.what();
+ GlobalOutput(errStr.c_str());
+ break;
+ }
+ }
+ }
+
+ releaseOneDescriptor("serverTransport", serverTransport_);
+}
+
+void TServerFramework::stop() {
+ serverTransport_->interrupt();
+ serverTransport_->interruptChildren();
+}
+
+void TServerFramework::disposeConnectedClient(TConnectedClient *pClient) {
+ onClientDisconnected(pClient);
+ delete pClient;
+}
+
+}
+}
+} // apache::thrift::server
+
diff --git a/lib/cpp/src/thrift/server/TServerFramework.h b/lib/cpp/src/thrift/server/TServerFramework.h
new file mode 100644
index 0000000..67d5420
--- /dev/null
+++ b/lib/cpp/src/thrift/server/TServerFramework.h
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_SERVER_TSERVERFRAMEWORK_H_
+#define _THRIFT_SERVER_TSERVERFRAMEWORK_H_ 1
+
+#include <boost/shared_ptr.hpp>
+#include <thrift/TProcessor.h>
+#include <thrift/server/TConnectedClient.h>
+#include <thrift/server/TServer.h>
+#include <thrift/transport/TServerTransport.h>
+#include <thrift/transport/TTransport.h>
+
+namespace apache {
+namespace thrift {
+namespace server {
+
+/**
+ * TServerFramework provides a single consolidated processing loop for
+ * servers. By having a single processing loop, behavior between servers
+ * is more predictable and maintenance cost is lowered. Implementations
+ * of TServerFramework must provide a method to deal with a client that
+ * connects and one that disconnects.
+ *
+ * While this functionality could be rolled directly into TServer, and
+ * probably should be, it would break the TServer interface contract so
+ * to maintain backwards compatibility for third party servers, no TServers
+ * were harmed in the making of this class.
+ */
+class TServerFramework : public TServer {
+public:
+ TServerFramework(
+ const boost::shared_ptr<apache::thrift::TProcessorFactory>& processorFactory,
+ const boost::shared_ptr<apache::thrift::transport::TServerTransport>& serverTransport,
+ const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& transportFactory,
+ const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& protocolFactory);
+
+ TServerFramework(
+ const boost::shared_ptr<apache::thrift::TProcessor>& processor,
+ const boost::shared_ptr<apache::thrift::transport::TServerTransport>& serverTransport,
+ const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& transportFactory,
+ const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& protocolFactory);
+
+ TServerFramework(
+ const boost::shared_ptr<apache::thrift::TProcessorFactory>& processorFactory,
+ const boost::shared_ptr<apache::thrift::transport::TServerTransport>& serverTransport,
+ const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& inputTransportFactory,
+ const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& outputTransportFactory,
+ const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& inputProtocolFactory,
+ const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& outputProtocolFactory);
+
+ TServerFramework(
+ const boost::shared_ptr<apache::thrift::TProcessor>& processor,
+ const boost::shared_ptr<apache::thrift::transport::TServerTransport>& serverTransport,
+ const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& inputTransportFactory,
+ const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& outputTransportFactory,
+ const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& inputProtocolFactory,
+ const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& outputProtocolFactory);
+
+ virtual ~TServerFramework();
+
+ /**
+ * Accept clients from the TServerTransport and add them for processing.
+ * Call stop() on another thread to interrupt processing
+ * and return control to the caller.
+ * Post-conditions (return guarantees):
+ * The serverTransport will be closed.
+ */
+ virtual void serve();
+
+ /**
+ * Interrupt serve() so that it meets post-conditions and returns.
+ */
+ virtual void stop();
+
+protected:
+ /**
+ * A client has connected. The implementation is responsible for storing
+ * and processing the client. This is called during the serve() thread,
+ * therefore a failure to return quickly will result in new client connection
+ * delays.
+ *
+ * \param[in] pClient the newly connected client
+ */
+ virtual void onClientConnected(const boost::shared_ptr<TConnectedClient>& pClient) = 0;
+
+ /**
+ * A client has disconnected.
+ * The client TTransport has already been closed.
+ * The implementation must not delete the pointer.
+ *
+ * \param[in] pClient the disconnected client
+ */
+ virtual void onClientDisconnected(TConnectedClient *pClient) = 0;
+
+private:
+ /**
+ * Smart pointer client deletion.
+ * Calls onClientDisconnected and then deletes pClient.
+ */
+ void disposeConnectedClient(TConnectedClient *pClient);
+};
+
+}
+}
+} // apache::thrift::server
+
+#endif // #ifndef _THRIFT_SERVER_TSERVERFRAMEWORK_H_
diff --git a/lib/cpp/src/thrift/server/TSimpleServer.cpp b/lib/cpp/src/thrift/server/TSimpleServer.cpp
index b63c45e..a133c0d 100644
--- a/lib/cpp/src/thrift/server/TSimpleServer.cpp
+++ b/lib/cpp/src/thrift/server/TSimpleServer.cpp
@@ -17,116 +17,74 @@
* under the License.
*/
-#include <thrift/server/TConnectedClient.h>
#include <thrift/server/TSimpleServer.h>
-#include <thrift/transport/TTransportException.h>
-#include <string>
-#include <iostream>
namespace apache {
namespace thrift {
namespace server {
-using namespace std;
-using namespace apache::thrift;
-using namespace apache::thrift::protocol;
-using namespace apache::thrift::transport;
+using apache::thrift::protocol::TProtocol;
+using apache::thrift::protocol::TProtocolFactory;
+using apache::thrift::transport::TServerTransport;
+using apache::thrift::transport::TTransport;
+using apache::thrift::transport::TTransportException;
+using apache::thrift::transport::TTransportFactory;
using boost::shared_ptr;
+using std::string;
+
+TSimpleServer::TSimpleServer(
+ const shared_ptr<TProcessorFactory>& processorFactory,
+ const shared_ptr<TServerTransport>& serverTransport,
+ const shared_ptr<TTransportFactory>& transportFactory,
+ const shared_ptr<TProtocolFactory>& protocolFactory)
+ : TServerFramework(processorFactory, serverTransport,
+ transportFactory, protocolFactory) {}
+
+TSimpleServer::TSimpleServer(
+ const shared_ptr<TProcessor>& processor,
+ const shared_ptr<TServerTransport>& serverTransport,
+ const shared_ptr<TTransportFactory>& transportFactory,
+ const shared_ptr<TProtocolFactory>& protocolFactory)
+ : TServerFramework(processor, serverTransport,
+ transportFactory, protocolFactory) {}
+
+TSimpleServer::TSimpleServer(
+ const shared_ptr<TProcessorFactory>& processorFactory,
+ const shared_ptr<TServerTransport>& serverTransport,
+ const shared_ptr<TTransportFactory>& inputTransportFactory,
+ const shared_ptr<TTransportFactory>& outputTransportFactory,
+ const shared_ptr<TProtocolFactory>& inputProtocolFactory,
+ const shared_ptr<TProtocolFactory>& outputProtocolFactory)
+ : TServerFramework(processorFactory, serverTransport,
+ inputTransportFactory, outputTransportFactory,
+ inputProtocolFactory, outputProtocolFactory) {}
+
+TSimpleServer::TSimpleServer(
+ const shared_ptr<TProcessor>& processor,
+ const shared_ptr<TServerTransport>& serverTransport,
+ const shared_ptr<TTransportFactory>& inputTransportFactory,
+ const shared_ptr<TTransportFactory>& outputTransportFactory,
+ const shared_ptr<TProtocolFactory>& inputProtocolFactory,
+ const shared_ptr<TProtocolFactory>& outputProtocolFactory)
+ : TServerFramework(processor, serverTransport,
+ inputTransportFactory, outputTransportFactory,
+ inputProtocolFactory, outputProtocolFactory) {}
+
+TSimpleServer::~TSimpleServer() {}
/**
- * A simple single-threaded application server. Perfect for unit tests!
- *
+ * The main body of customized implementation for TSimpleServer is quite simple:
+ * When a client connects, use the serve() thread to drive it to completion thus
+ * blocking new connections.
*/
-void TSimpleServer::serve() {
-
- shared_ptr<TTransport> client;
- shared_ptr<TTransport> inputTransport;
- shared_ptr<TTransport> outputTransport;
- shared_ptr<TProtocol> inputProtocol;
- shared_ptr<TProtocol> outputProtocol;
-
- // Start the server listening
- serverTransport_->listen();
-
- // Run the preServe event
- if (eventHandler_) {
- eventHandler_->preServe();
- }
-
- // Fetch client from server
- while (!stop_) {
- try {
- client = serverTransport_->accept();
- inputTransport = inputTransportFactory_->getTransport(client);
- outputTransport = outputTransportFactory_->getTransport(client);
- inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
- outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
- } catch (TTransportException& ttx) {
- if (inputTransport) {
- inputTransport->close();
- }
- if (outputTransport) {
- outputTransport->close();
- }
- if (client) {
- client->close();
- }
- if (ttx.getType() != TTransportException::INTERRUPTED) {
- string errStr = string("TServerTransport died on accept: ") + ttx.what();
- GlobalOutput(errStr.c_str());
- }
- if (stop_) break; else continue;
- } catch (TException& tx) {
- if (inputTransport) {
- inputTransport->close();
- }
- if (outputTransport) {
- outputTransport->close();
- }
- if (client) {
- client->close();
- }
- string errStr = string("Some kind of accept exception: ") + tx.what();
- GlobalOutput(errStr.c_str());
- continue;
- } catch (const string& s) {
- if (inputTransport) {
- inputTransport->close();
- }
- if (outputTransport) {
- outputTransport->close();
- }
- if (client) {
- client->close();
- }
- string errStr = string("Some kind of accept exception: ") + s;
- GlobalOutput(errStr.c_str());
- break;
- }
-
- TConnectedClient("TSimpleServer",
- getProcessor(inputProtocol, outputProtocol, client),
- inputProtocol, outputProtocol, eventHandler_, client).run();
- }
-
- if (stop_) {
- try {
- serverTransport_->close();
- } catch (TTransportException& ttx) {
- string errStr = string("TServerTransport failed on close: ") + ttx.what();
- GlobalOutput(errStr.c_str());
- }
- stop_ = false;
- }
+void TSimpleServer::onClientConnected(const shared_ptr<TConnectedClient>& pClient) {
+ pClient->run();
}
-void TSimpleServer::stop() {
- if (!stop_) {
- stop_ = true;
- serverTransport_->interrupt();
- serverTransport_->interruptChildren();
- }
-}
+/**
+ * TSimpleServer does not track clients so there is nothing to do here.
+ */
+void TSimpleServer::onClientDisconnected(TConnectedClient *pClient) {}
}
}
diff --git a/lib/cpp/src/thrift/server/TSimpleServer.h b/lib/cpp/src/thrift/server/TSimpleServer.h
index 7b8677d..51b00e4 100644
--- a/lib/cpp/src/thrift/server/TSimpleServer.h
+++ b/lib/cpp/src/thrift/server/TSimpleServer.h
@@ -20,8 +20,7 @@
#ifndef _THRIFT_SERVER_TSIMPLESERVER_H_
#define _THRIFT_SERVER_TSIMPLESERVER_H_ 1
-#include <thrift/server/TServer.h>
-#include <thrift/transport/TServerTransport.h>
+#include <thrift/server/TServerFramework.h>
namespace apache {
namespace thrift {
@@ -30,77 +29,41 @@
/**
* This is the most basic simple server. It is single-threaded and runs a
* continuous loop of accepting a single connection, processing requests on
- * that connection until it closes, and then repeating. It is a good example
- * of how to extend the TServer interface.
+ * that connection until it closes, and then repeating.
*/
-class TSimpleServer : public TServer {
+class TSimpleServer : public TServerFramework {
public:
- template <typename ProcessorFactory>
- TSimpleServer(const boost::shared_ptr<ProcessorFactory>& processorFactory,
- const boost::shared_ptr<TServerTransport>& serverTransport,
- const boost::shared_ptr<TTransportFactory>& transportFactory,
- const boost::shared_ptr<TProtocolFactory>& protocolFactory,
- THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory))
- : TServer(processorFactory, serverTransport, transportFactory, protocolFactory), stop_(false) {}
+ TSimpleServer(const boost::shared_ptr<apache::thrift::TProcessorFactory>& processorFactory,
+ const boost::shared_ptr<apache::thrift::transport::TServerTransport>& serverTransport,
+ const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& transportFactory,
+ const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& protocolFactory);
- template <typename Processor>
- TSimpleServer(const boost::shared_ptr<Processor>& processor,
- const boost::shared_ptr<TServerTransport>& serverTransport,
- const boost::shared_ptr<TTransportFactory>& transportFactory,
- const boost::shared_ptr<TProtocolFactory>& protocolFactory,
- THRIFT_OVERLOAD_IF(Processor, TProcessor))
- : TServer(processor, serverTransport, transportFactory, protocolFactory), stop_(false) {}
+ TSimpleServer(const boost::shared_ptr<apache::thrift::TProcessor>& processor,
+ const boost::shared_ptr<apache::thrift::transport::TServerTransport>& serverTransport,
+ const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& transportFactory,
+ const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& protocolFactory);
- template <typename ProcessorFactory>
- TSimpleServer(const boost::shared_ptr<ProcessorFactory>& processorFactory,
- const boost::shared_ptr<TServerTransport>& serverTransport,
- const boost::shared_ptr<TTransportFactory>& inputTransportFactory,
- const boost::shared_ptr<TTransportFactory>& outputTransportFactory,
- const boost::shared_ptr<TProtocolFactory>& inputProtocolFactory,
- const boost::shared_ptr<TProtocolFactory>& outputProtocolFactory,
- THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory))
- : TServer(processorFactory,
- serverTransport,
- inputTransportFactory,
- outputTransportFactory,
- inputProtocolFactory,
- outputProtocolFactory),
- stop_(false) {}
+ TSimpleServer(const boost::shared_ptr<apache::thrift::TProcessorFactory>& processorFactory,
+ const boost::shared_ptr<apache::thrift::transport::TServerTransport>& serverTransport,
+ const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& inputTransportFactory,
+ const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& outputTransportFactory,
+ const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& inputProtocolFactory,
+ const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& outputProtocolFactory);
- template <typename Processor>
- TSimpleServer(const boost::shared_ptr<Processor>& processor,
- const boost::shared_ptr<TServerTransport>& serverTransport,
- const boost::shared_ptr<TTransportFactory>& inputTransportFactory,
- const boost::shared_ptr<TTransportFactory>& outputTransportFactory,
- const boost::shared_ptr<TProtocolFactory>& inputProtocolFactory,
- const boost::shared_ptr<TProtocolFactory>& outputProtocolFactory,
- THRIFT_OVERLOAD_IF(Processor, TProcessor))
- : TServer(processor,
- serverTransport,
- inputTransportFactory,
- outputTransportFactory,
- inputProtocolFactory,
- outputProtocolFactory),
- stop_(false) {}
+ TSimpleServer(const boost::shared_ptr<apache::thrift::TProcessor>& processor,
+ const boost::shared_ptr<apache::thrift::transport::TServerTransport>& serverTransport,
+ const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& inputTransportFactory,
+ const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& outputTransportFactory,
+ const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& inputProtocolFactory,
+ const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& outputProtocolFactory);
- /**
- * Process one connection at a time using the caller's thread.
- * Call stop() on another thread to interrupt processing and
- * return control to the caller.
- * Post-conditions (return guarantees):
- * The serverTransport will be closed.
- * There will be no connected client.
- */
- void serve();
-
- /**
- * Interrupt serve() so that it meets post-conditions and returns.
- */
- void stop();
+ virtual ~TSimpleServer();
protected:
- bool stop_;
+ virtual void onClientConnected(const boost::shared_ptr<TConnectedClient>& pClient) /* override */;
+ virtual void onClientDisconnected(TConnectedClient *pClient) /* override */;
};
+
}
}
} // apache::thrift::server
diff --git a/lib/cpp/src/thrift/server/TThreadPoolServer.cpp b/lib/cpp/src/thrift/server/TThreadPoolServer.cpp
index f8ed6cf..a5f8c76 100644
--- a/lib/cpp/src/thrift/server/TThreadPoolServer.cpp
+++ b/lib/cpp/src/thrift/server/TThreadPoolServer.cpp
@@ -17,136 +17,83 @@
* under the License.
*/
-#include <thrift/thrift-config.h>
-
-#include <thrift/server/TConnectedClient.h>
#include <thrift/server/TThreadPoolServer.h>
-#include <thrift/transport/TTransportException.h>
-#include <thrift/concurrency/Thread.h>
-#include <thrift/concurrency/ThreadManager.h>
-#include <string>
-#include <iostream>
-#include <boost/make_shared.hpp>
namespace apache {
namespace thrift {
namespace server {
+using apache::thrift::concurrency::ThreadManager;
+using apache::thrift::protocol::TProtocol;
+using apache::thrift::protocol::TProtocolFactory;
+using apache::thrift::transport::TServerTransport;
+using apache::thrift::transport::TTransport;
+using apache::thrift::transport::TTransportException;
+using apache::thrift::transport::TTransportFactory;
using boost::shared_ptr;
-using namespace std;
-using namespace apache::thrift;
-using namespace apache::thrift::concurrency;
-using namespace apache::thrift::protocol;
-using namespace apache::thrift::transport;
+using std::string;
+
+TThreadPoolServer::TThreadPoolServer(
+ const shared_ptr<TProcessorFactory>& processorFactory,
+ const shared_ptr<TServerTransport>& serverTransport,
+ const shared_ptr<TTransportFactory>& transportFactory,
+ const shared_ptr<TProtocolFactory>& protocolFactory,
+ const shared_ptr<ThreadManager>& threadManager)
+ : TServerFramework(processorFactory, serverTransport,
+ transportFactory, protocolFactory),
+ threadManager_(threadManager),
+ timeout_(0),
+ taskExpiration_(0) {}
+
+TThreadPoolServer::TThreadPoolServer(
+ const shared_ptr<TProcessor>& processor,
+ const shared_ptr<TServerTransport>& serverTransport,
+ const shared_ptr<TTransportFactory>& transportFactory,
+ const shared_ptr<TProtocolFactory>& protocolFactory,
+ const shared_ptr<ThreadManager>& threadManager)
+ : TServerFramework(processor, serverTransport,
+ transportFactory, protocolFactory),
+ threadManager_(threadManager),
+ timeout_(0),
+ taskExpiration_(0) {}
+
+TThreadPoolServer::TThreadPoolServer(
+ const shared_ptr<TProcessorFactory>& processorFactory,
+ const shared_ptr<TServerTransport>& serverTransport,
+ const shared_ptr<TTransportFactory>& inputTransportFactory,
+ const shared_ptr<TTransportFactory>& outputTransportFactory,
+ const shared_ptr<TProtocolFactory>& inputProtocolFactory,
+ const shared_ptr<TProtocolFactory>& outputProtocolFactory,
+ const shared_ptr<ThreadManager>& threadManager)
+ : TServerFramework(processorFactory, serverTransport,
+ inputTransportFactory, outputTransportFactory,
+ inputProtocolFactory, outputProtocolFactory),
+ threadManager_(threadManager),
+ stop_(false),
+ timeout_(0),
+ taskExpiration_(0) {}
+
+TThreadPoolServer::TThreadPoolServer(
+ const shared_ptr<TProcessor>& processor,
+ const shared_ptr<TServerTransport>& serverTransport,
+ const shared_ptr<TTransportFactory>& inputTransportFactory,
+ const shared_ptr<TTransportFactory>& outputTransportFactory,
+ const shared_ptr<TProtocolFactory>& inputProtocolFactory,
+ const shared_ptr<TProtocolFactory>& outputProtocolFactory,
+ const shared_ptr<ThreadManager>& threadManager)
+ : TServerFramework(processor, serverTransport,
+ inputTransportFactory, outputTransportFactory,
+ inputProtocolFactory, outputProtocolFactory),
+ threadManager_(threadManager),
+ stop_(false),
+ timeout_(0),
+ taskExpiration_(0) {}
TThreadPoolServer::~TThreadPoolServer() {}
void TThreadPoolServer::serve() {
- shared_ptr<TTransport> client;
- shared_ptr<TTransport> inputTransport;
- shared_ptr<TTransport> outputTransport;
- shared_ptr<TProtocol> inputProtocol;
- shared_ptr<TProtocol> outputProtocol;
-
- // Start the server listening
- serverTransport_->listen();
-
- // Run the preServe event
- if (eventHandler_) {
- eventHandler_->preServe();
- }
-
- while (!stop_) {
- try {
- client.reset();
- inputTransport.reset();
- outputTransport.reset();
- inputProtocol.reset();
- outputProtocol.reset();
-
- // Fetch client from server
- client = serverTransport_->accept();
-
- // Make IO transports
- inputTransport = inputTransportFactory_->getTransport(client);
- outputTransport = outputTransportFactory_->getTransport(client);
- inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
- outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
-
- shared_ptr<TProcessor> processor = getProcessor(inputProtocol, outputProtocol, client);
-
- // Add to threadmanager pool
- threadManager_->add(
- boost::make_shared<TConnectedClient>(
- "TThreadPoolServer",
- getProcessor(inputProtocol, outputProtocol, client),
- inputProtocol, outputProtocol, eventHandler_, client),
- timeout_,
- taskExpiration_);
-
- } catch (TTransportException& ttx) {
- if (inputTransport) {
- inputTransport->close();
- }
- if (outputTransport) {
- outputTransport->close();
- }
- if (client) {
- client->close();
- }
- if (ttx.getType() != TTransportException::INTERRUPTED) {
- string errStr = string("TThreadPoolServer: TServerTransport died on accept: ") + ttx.what();
- GlobalOutput(errStr.c_str());
- }
- if (stop_) break; else continue;
- } catch (TException& tx) {
- if (inputTransport) {
- inputTransport->close();
- }
- if (outputTransport) {
- outputTransport->close();
- }
- if (client) {
- client->close();
- }
- string errStr = string("TThreadPoolServer: Caught TException: ") + tx.what();
- GlobalOutput(errStr.c_str());
- continue;
- } catch (const string& s) {
- if (inputTransport) {
- inputTransport->close();
- }
- if (outputTransport) {
- outputTransport->close();
- }
- if (client) {
- client->close();
- }
- string errStr = "TThreadPoolServer: Unknown exception: " + s;
- GlobalOutput(errStr.c_str());
- break;
- }
- }
-
- // If stopped manually, join the existing threads
- if (stop_) {
- try {
- serverTransport_->close();
- threadManager_->join();
- } catch (TException& tx) {
- string errStr = string("TThreadPoolServer: Exception shutting down: ") + tx.what();
- GlobalOutput(errStr.c_str());
- }
- stop_ = false;
- }
-}
-
-void TThreadPoolServer::stop() {
- if (!stop_) {
- stop_ = true;
- serverTransport_->interrupt();
- serverTransport_->interruptChildren();
- }
+ TServerFramework::serve();
+ threadManager_->join();
}
int64_t TThreadPoolServer::getTimeout() const {
@@ -164,6 +111,13 @@
void TThreadPoolServer::setTaskExpiration(int64_t value) {
taskExpiration_ = value;
}
+
+void TThreadPoolServer::onClientConnected(const shared_ptr<TConnectedClient>& pClient) {
+ threadManager_->add(pClient, timeout_, taskExpiration_);
+}
+
+void TThreadPoolServer::onClientDisconnected(TConnectedClient *pClient) {}
+
}
}
} // apache::thrift::server
diff --git a/lib/cpp/src/thrift/server/TThreadPoolServer.h b/lib/cpp/src/thrift/server/TThreadPoolServer.h
index 2f93463..29e9aaf 100644
--- a/lib/cpp/src/thrift/server/TThreadPoolServer.h
+++ b/lib/cpp/src/thrift/server/TThreadPoolServer.h
@@ -21,115 +21,68 @@
#define _THRIFT_SERVER_TTHREADPOOLSERVER_H_ 1
#include <thrift/concurrency/ThreadManager.h>
-#include <thrift/server/TServer.h>
-#include <thrift/transport/TServerTransport.h>
-
-#include <boost/shared_ptr.hpp>
+#include <thrift/server/TServerFramework.h>
namespace apache {
namespace thrift {
namespace server {
-using apache::thrift::concurrency::ThreadManager;
-using apache::thrift::protocol::TProtocolFactory;
-using apache::thrift::transport::TServerTransport;
-using apache::thrift::transport::TTransportFactory;
-
-class TThreadPoolServer : public TServer {
+/**
+ * Manage clients using a thread pool.
+ */
+class TThreadPoolServer : public TServerFramework {
public:
- template <typename ProcessorFactory>
- TThreadPoolServer(const boost::shared_ptr<ProcessorFactory>& processorFactory,
- const boost::shared_ptr<TServerTransport>& serverTransport,
- const boost::shared_ptr<TTransportFactory>& transportFactory,
- const boost::shared_ptr<TProtocolFactory>& protocolFactory,
- const boost::shared_ptr<ThreadManager>& threadManager,
- THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory))
- : TServer(processorFactory, serverTransport, transportFactory, protocolFactory),
- threadManager_(threadManager),
- stop_(false),
- timeout_(0),
- taskExpiration_(0) {}
+ TThreadPoolServer(
+ const boost::shared_ptr<apache::thrift::TProcessorFactory>& processorFactory,
+ const boost::shared_ptr<apache::thrift::transport::TServerTransport>& serverTransport,
+ const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& transportFactory,
+ const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& protocolFactory,
+ const boost::shared_ptr<apache::thrift::concurrency::ThreadManager>& threadManager);
- template <typename Processor>
- TThreadPoolServer(const boost::shared_ptr<Processor>& processor,
- const boost::shared_ptr<TServerTransport>& serverTransport,
- const boost::shared_ptr<TTransportFactory>& transportFactory,
- const boost::shared_ptr<TProtocolFactory>& protocolFactory,
- const boost::shared_ptr<ThreadManager>& threadManager,
- THRIFT_OVERLOAD_IF(Processor, TProcessor))
- : TServer(processor, serverTransport, transportFactory, protocolFactory),
- threadManager_(threadManager),
- stop_(false),
- timeout_(0),
- taskExpiration_(0) {}
+ TThreadPoolServer(
+ const boost::shared_ptr<apache::thrift::TProcessor>& processor,
+ const boost::shared_ptr<apache::thrift::transport::TServerTransport>& serverTransport,
+ const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& transportFactory,
+ const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& protocolFactory,
+ const boost::shared_ptr<apache::thrift::concurrency::ThreadManager>& threadManager);
- template <typename ProcessorFactory>
- TThreadPoolServer(const boost::shared_ptr<ProcessorFactory>& processorFactory,
- const boost::shared_ptr<TServerTransport>& serverTransport,
- const boost::shared_ptr<TTransportFactory>& inputTransportFactory,
- const boost::shared_ptr<TTransportFactory>& outputTransportFactory,
- const boost::shared_ptr<TProtocolFactory>& inputProtocolFactory,
- const boost::shared_ptr<TProtocolFactory>& outputProtocolFactory,
- const boost::shared_ptr<ThreadManager>& threadManager,
- THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory))
- : TServer(processorFactory,
- serverTransport,
- inputTransportFactory,
- outputTransportFactory,
- inputProtocolFactory,
- outputProtocolFactory),
- threadManager_(threadManager),
- stop_(false),
- timeout_(0),
- taskExpiration_(0) {}
+ TThreadPoolServer(
+ const boost::shared_ptr<apache::thrift::TProcessorFactory>& processorFactory,
+ const boost::shared_ptr<apache::thrift::transport::TServerTransport>& serverTransport,
+ const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& inputTransportFactory,
+ const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& outputTransportFactory,
+ const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& inputProtocolFactory,
+ const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& outputProtocolFactory,
+ const boost::shared_ptr<apache::thrift::concurrency::ThreadManager>& threadManager);
- template <typename Processor>
- TThreadPoolServer(const boost::shared_ptr<Processor>& processor,
- const boost::shared_ptr<TServerTransport>& serverTransport,
- const boost::shared_ptr<TTransportFactory>& inputTransportFactory,
- const boost::shared_ptr<TTransportFactory>& outputTransportFactory,
- const boost::shared_ptr<TProtocolFactory>& inputProtocolFactory,
- const boost::shared_ptr<TProtocolFactory>& outputProtocolFactory,
- const boost::shared_ptr<ThreadManager>& threadManager,
- THRIFT_OVERLOAD_IF(Processor, TProcessor))
- : TServer(processor,
- serverTransport,
- inputTransportFactory,
- outputTransportFactory,
- inputProtocolFactory,
- outputProtocolFactory),
- threadManager_(threadManager),
- stop_(false),
- timeout_(0),
- taskExpiration_(0) {}
+ TThreadPoolServer(
+ const boost::shared_ptr<apache::thrift::TProcessor>& processor,
+ const boost::shared_ptr<apache::thrift::transport::TServerTransport>& serverTransport,
+ const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& inputTransportFactory,
+ const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& outputTransportFactory,
+ const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& inputProtocolFactory,
+ const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& outputProtocolFactory,
+ const boost::shared_ptr<apache::thrift::concurrency::ThreadManager>& threadManager);
virtual ~TThreadPoolServer();
/**
- * Process all connections that arrive using a thread pool.
- * Call stop() on another thread to interrupt processing and
- * return control to the caller.
* Post-conditions (return guarantees):
- * The serverTransport will be closed.
- * There will be no connected clients.
+ * There will be no clients connected.
*/
virtual void serve();
- /**
- * Interrupt serve() so that it meets post-conditions and returns.
- */
- virtual void stop();
-
virtual int64_t getTimeout() const;
-
virtual void setTimeout(int64_t value);
virtual int64_t getTaskExpiration() const;
-
virtual void setTaskExpiration(int64_t value);
protected:
- boost::shared_ptr<ThreadManager> threadManager_;
+ virtual void onClientConnected(const boost::shared_ptr<TConnectedClient>& pClient) /* override */;
+ virtual void onClientDisconnected(TConnectedClient *pClient) /* override */;
+
+ boost::shared_ptr<apache::thrift::concurrency::ThreadManager> threadManager_;
volatile bool stop_;
@@ -137,6 +90,7 @@
volatile int64_t taskExpiration_;
};
+
}
}
} // apache::thrift::server
diff --git a/lib/cpp/src/thrift/server/TThreadedServer.cpp b/lib/cpp/src/thrift/server/TThreadedServer.cpp
index 4dcdb44..440cede 100644
--- a/lib/cpp/src/thrift/server/TThreadedServer.cpp
+++ b/lib/cpp/src/thrift/server/TThreadedServer.cpp
@@ -17,166 +17,109 @@
* under the License.
*/
-#include <boost/bind.hpp>
-#include <thrift/server/TConnectedClient.h>
-#include <thrift/server/TThreadedServer.h>
-#include <thrift/transport/TTransportException.h>
#include <thrift/concurrency/PlatformThreadFactory.h>
-
-#include <string>
-#include <iostream>
-
-#ifdef HAVE_UNISTD_H
-#include <unistd.h>
-#endif
+#include <thrift/server/TThreadedServer.h>
namespace apache {
namespace thrift {
namespace server {
+using apache::thrift::concurrency::Synchronized;
+using apache::thrift::concurrency::Thread;
+using apache::thrift::concurrency::ThreadFactory;
+using apache::thrift::protocol::TProtocol;
+using apache::thrift::protocol::TProtocolFactory;
+using apache::thrift::transport::TServerTransport;
+using apache::thrift::transport::TTransport;
+using apache::thrift::transport::TTransportException;
+using apache::thrift::transport::TTransportFactory;
using boost::shared_ptr;
-using namespace std;
-using namespace apache::thrift;
-using namespace apache::thrift::protocol;
-using namespace apache::thrift::transport;
-using namespace apache::thrift::concurrency;
+using std::string;
+
+TThreadedServer::TThreadedServer(
+ const shared_ptr<TProcessorFactory>& processorFactory,
+ const shared_ptr<TServerTransport>& serverTransport,
+ const shared_ptr<TTransportFactory>& transportFactory,
+ const shared_ptr<TProtocolFactory>& protocolFactory,
+ const shared_ptr<ThreadFactory>& threadFactory)
+ : TServerFramework(processorFactory, serverTransport, transportFactory, protocolFactory),
+ threadFactory_(threadFactory) {}
+
+
+TThreadedServer::TThreadedServer(
+ const shared_ptr<TProcessor>& processor,
+ const shared_ptr<TServerTransport>& serverTransport,
+ const shared_ptr<TTransportFactory>& transportFactory,
+ const shared_ptr<TProtocolFactory>& protocolFactory,
+ const shared_ptr<ThreadFactory>& threadFactory)
+ : TServerFramework(processor, serverTransport, transportFactory, protocolFactory),
+ threadFactory_(threadFactory) {}
+
+TThreadedServer::TThreadedServer(
+ const shared_ptr<TProcessorFactory>& processorFactory,
+ const shared_ptr<TServerTransport>& serverTransport,
+ const shared_ptr<TTransportFactory>& inputTransportFactory,
+ const shared_ptr<TTransportFactory>& outputTransportFactory,
+ const shared_ptr<TProtocolFactory>& inputProtocolFactory,
+ const shared_ptr<TProtocolFactory>& outputProtocolFactory,
+ const shared_ptr<ThreadFactory>& threadFactory)
+ : TServerFramework(processorFactory, serverTransport,
+ inputTransportFactory, outputTransportFactory,
+ inputProtocolFactory, outputProtocolFactory),
+ threadFactory_(threadFactory) {}
+
+TThreadedServer::TThreadedServer(
+ const shared_ptr<TProcessor>& processor,
+ const shared_ptr<TServerTransport>& serverTransport,
+ const shared_ptr<TTransportFactory>& inputTransportFactory,
+ const shared_ptr<TTransportFactory>& outputTransportFactory,
+ const shared_ptr<TProtocolFactory>& inputProtocolFactory,
+ const shared_ptr<TProtocolFactory>& outputProtocolFactory,
+ const shared_ptr<ThreadFactory>& threadFactory)
+ : TServerFramework(processor, serverTransport,
+ inputTransportFactory, outputTransportFactory,
+ inputProtocolFactory, outputProtocolFactory),
+ threadFactory_(threadFactory) {}
TThreadedServer::~TThreadedServer() {}
void TThreadedServer::serve() {
+ TServerFramework::serve();
- shared_ptr<TTransport> client;
- shared_ptr<TTransport> inputTransport;
- shared_ptr<TTransport> outputTransport;
- shared_ptr<TProtocol> inputProtocol;
- shared_ptr<TProtocol> outputProtocol;
-
- // Start the server listening
- serverTransport_->listen();
-
- // Run the preServe event
- if (eventHandler_) {
- eventHandler_->preServe();
- }
-
- while (!stop_) {
- try {
- client.reset();
- inputTransport.reset();
- outputTransport.reset();
- inputProtocol.reset();
- outputProtocol.reset();
-
- // Fetch client from server
- client = serverTransport_->accept();
-
- // Make IO transports
- inputTransport = inputTransportFactory_->getTransport(client);
- outputTransport = outputTransportFactory_->getTransport(client);
- inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
- outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
-
- shared_ptr<TConnectedClient> pClient(
- new TConnectedClient("TThreadedServer",
- getProcessor(inputProtocol, outputProtocol, client),
- inputProtocol, outputProtocol, eventHandler_, client),
- boost::bind(&TThreadedServer::disposeClient, this, _1));
-
- // Create a thread for this client
- shared_ptr<Thread> thread = shared_ptr<Thread>(threadFactory_->newThread(pClient));
-
- // Insert thread into the set of threads
- {
- Synchronized s(clientsMonitor_);
- clients_.insert(pClient.get());
- }
-
- // Start the thread!
- thread->start();
-
- } catch (TTransportException& ttx) {
- if (inputTransport) {
- inputTransport->close();
- }
- if (outputTransport) {
- outputTransport->close();
- }
- if (client) {
- client->close();
- }
- if (ttx.getType() != TTransportException::INTERRUPTED) {
- string errStr = string("TThreadedServer: TServerTransport died on accept: ") + ttx.what();
- GlobalOutput(errStr.c_str());
- }
- if (stop_) break; else continue;
- } catch (TException& tx) {
- if (inputTransport) {
- inputTransport->close();
- }
- if (outputTransport) {
- outputTransport->close();
- }
- if (client) {
- client->close();
- }
- string errStr = string("TThreadedServer: Caught TException: ") + tx.what();
- GlobalOutput(errStr.c_str());
- continue;
- } catch (const string& s) {
- if (inputTransport) {
- inputTransport->close();
- }
- if (outputTransport) {
- outputTransport->close();
- }
- if (client) {
- client->close();
- }
- string errStr = "TThreadedServer: Unknown exception: " + s;
- GlobalOutput(errStr.c_str());
- break;
+ // Drain all clients - no more will arrive
+ try {
+ Synchronized s(clientsMonitor_);
+ while (!clients_.empty()) {
+ clientsMonitor_.wait();
}
- }
-
- // If stopped manually, make sure to close server transport
- if (stop_) {
- try {
- serverTransport_->close();
- } catch (TException& tx) {
- string errStr = string("TThreadedServer: Exception shutting down: ") + tx.what();
- GlobalOutput(errStr.c_str());
- }
- try {
- Synchronized s(clientsMonitor_);
- while (!clients_.empty()) {
- clientsMonitor_.wait();
- }
- } catch (TException& tx) {
- string errStr = string("TThreadedServer: Exception joining workers: ") + tx.what();
- GlobalOutput(errStr.c_str());
- }
- stop_ = false;
+ } catch (TException& tx) {
+ string errStr = string("TThreadedServer: Exception joining workers: ") + tx.what();
+ GlobalOutput(errStr.c_str());
}
}
-void TThreadedServer::stop() {
- if (!stop_) {
- stop_ = true;
- serverTransport_->interrupt();
- serverTransport_->interruptChildren();
- }
-}
+void TThreadedServer::onClientConnected(const shared_ptr<TConnectedClient>& pClient)
+{
+ // Create a thread for this client
+ shared_ptr<Thread> thread = shared_ptr<Thread>(threadFactory_->newThread(pClient));
-void TThreadedServer::disposeClient(TConnectedClient *pClient) {
- // Remove this task from parent bookkeeping
+ // Insert thread into the set of threads
{
Synchronized s(clientsMonitor_);
- clients_.erase(pClient);
- if (clients_.empty()) {
- clientsMonitor_.notify();
- }
+ clients_.insert(pClient.get());
}
- delete pClient;
+
+ // Start the thread!
+ thread->start();
+}
+
+void TThreadedServer::onClientDisconnected(TConnectedClient *pClient) {
+ // Remove this task from parent bookkeeping
+ Synchronized s(clientsMonitor_);
+ clients_.erase(pClient);
+ if (clients_.empty()) {
+ clientsMonitor_.notify();
+ }
}
}
diff --git a/lib/cpp/src/thrift/server/TThreadedServer.h b/lib/cpp/src/thrift/server/TThreadedServer.h
index 5d510d6..7b66f1d 100644
--- a/lib/cpp/src/thrift/server/TThreadedServer.h
+++ b/lib/cpp/src/thrift/server/TThreadedServer.h
@@ -20,101 +20,72 @@
#ifndef _THRIFT_SERVER_TTHREADEDSERVER_H_
#define _THRIFT_SERVER_TTHREADEDSERVER_H_ 1
-#include <set>
-#include <thrift/server/TServer.h>
-#include <thrift/transport/TServerTransport.h>
#include <thrift/concurrency/Monitor.h>
#include <thrift/concurrency/PlatformThreadFactory.h>
#include <thrift/concurrency/Thread.h>
-
-#include <boost/shared_ptr.hpp>
+#include <thrift/server/TServerFramework.h>
namespace apache {
namespace thrift {
namespace server {
-using apache::thrift::TProcessor;
-using apache::thrift::transport::TServerTransport;
-using apache::thrift::transport::TTransportFactory;
-using apache::thrift::concurrency::Monitor;
-using apache::thrift::concurrency::PlatformThreadFactory;
-using apache::thrift::concurrency::ThreadFactory;
+#define THRIFT_DEFAULT_THREAD_FACTORY
-class TConnectedClient;
-
-class TThreadedServer : public TServer {
+/**
+ * Manage clients using a thread pool.
+ */
+class TThreadedServer : public TServerFramework {
public:
- template <typename ProcessorFactory>
- TThreadedServer(const boost::shared_ptr<ProcessorFactory>& processorFactory,
- const boost::shared_ptr<TServerTransport>& serverTransport,
- const boost::shared_ptr<TTransportFactory>& transportFactory,
- const boost::shared_ptr<TProtocolFactory>& protocolFactory,
- THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory))
- : TServer(processorFactory, serverTransport, transportFactory, protocolFactory),
- threadFactory_(new PlatformThreadFactory),
- stop_(false) {}
+ TThreadedServer(const boost::shared_ptr<apache::thrift::TProcessorFactory>& processorFactory,
+ const boost::shared_ptr<apache::thrift::transport::TServerTransport>& serverTransport,
+ const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& transportFactory,
+ const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& protocolFactory,
+ const boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>& threadFactory =
+ boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>(
+ new apache::thrift::concurrency::PlatformThreadFactory));
- template <typename ProcessorFactory>
- TThreadedServer(const boost::shared_ptr<ProcessorFactory>& processorFactory,
- const boost::shared_ptr<TServerTransport>& serverTransport,
- const boost::shared_ptr<TTransportFactory>& transportFactory,
- const boost::shared_ptr<TProtocolFactory>& protocolFactory,
- const boost::shared_ptr<ThreadFactory>& threadFactory,
- THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory))
- : TServer(processorFactory, serverTransport, transportFactory, protocolFactory),
- threadFactory_(threadFactory),
- stop_(false) {}
+ TThreadedServer(const boost::shared_ptr<apache::thrift::TProcessor>& processor,
+ const boost::shared_ptr<apache::thrift::transport::TServerTransport>& serverTransport,
+ const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& transportFactory,
+ const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& protocolFactory,
+ const boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>& threadFactory =
+ boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>(
+ new apache::thrift::concurrency::PlatformThreadFactory));
- template <typename Processor>
- TThreadedServer(const boost::shared_ptr<Processor>& processor,
- const boost::shared_ptr<TServerTransport>& serverTransport,
- const boost::shared_ptr<TTransportFactory>& transportFactory,
- const boost::shared_ptr<TProtocolFactory>& protocolFactory,
- THRIFT_OVERLOAD_IF(Processor, TProcessor))
- : TServer(processor, serverTransport, transportFactory, protocolFactory),
- threadFactory_(new PlatformThreadFactory),
- stop_(false) {}
+ TThreadedServer(const boost::shared_ptr<apache::thrift::TProcessorFactory>& processorFactory,
+ const boost::shared_ptr<apache::thrift::transport::TServerTransport>& serverTransport,
+ const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& inputTransportFactory,
+ const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& outputTransportFactory,
+ const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& inputProtocolFactory,
+ const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& outputProtocolFactory,
+ const boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>& threadFactory =
+ boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>(
+ new apache::thrift::concurrency::PlatformThreadFactory));
- template <typename Processor>
- TThreadedServer(const boost::shared_ptr<Processor>& processor,
- const boost::shared_ptr<TServerTransport>& serverTransport,
- const boost::shared_ptr<TTransportFactory>& transportFactory,
- const boost::shared_ptr<TProtocolFactory>& protocolFactory,
- const boost::shared_ptr<ThreadFactory>& threadFactory,
- THRIFT_OVERLOAD_IF(Processor, TProcessor))
- : TServer(processor, serverTransport, transportFactory, protocolFactory),
- threadFactory_(threadFactory),
- stop_(false) {}
+ TThreadedServer(const boost::shared_ptr<apache::thrift::TProcessor>& processor,
+ const boost::shared_ptr<apache::thrift::transport::TServerTransport>& serverTransport,
+ const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& inputTransportFactory,
+ const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& outputTransportFactory,
+ const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& inputProtocolFactory,
+ const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& outputProtocolFactory,
+ const boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>& threadFactory =
+ boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>(
+ new apache::thrift::concurrency::PlatformThreadFactory));
virtual ~TThreadedServer();
/**
- * Process all connections that arrive, each on their own
- * dedicated thread. There is no limit to the number of
- * threads or connections (see THRIFT-3084).
- * Call stop() on another thread to interrupt processing and
- * return control to the caller.
* Post-conditions (return guarantees):
- * The serverTransport will be closed.
- * There will be no connected clients.
+ * There will be no clients connected.
*/
virtual void serve();
- /**
- * Interrupt serve() so that it meets post-conditions and returns.
- */
- virtual void stop();
-
protected:
- /**
- * Smart pointer release method
- */
- virtual void disposeClient(TConnectedClient *pClient);
+ virtual void onClientConnected(const boost::shared_ptr<TConnectedClient>& pClient) /* override */;
+ virtual void onClientDisconnected(TConnectedClient *pClient) /* override */;
- boost::shared_ptr<ThreadFactory> threadFactory_;
- volatile bool stop_;
-
- Monitor clientsMonitor_;
+ boost::shared_ptr<apache::thrift::concurrency::ThreadFactory> threadFactory_;
+ apache::thrift::concurrency::Monitor clientsMonitor_;
std::set<TConnectedClient*> clients_;
};