THRIFT-1314. cpp: add TProcessorFactory
Patch: Adam Simpkins
git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1164190 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/TProcessor.h b/lib/cpp/src/TProcessor.h
index a593776..26c3ee4 100644
--- a/lib/cpp/src/TProcessor.h
+++ b/lib/cpp/src/TProcessor.h
@@ -189,6 +189,45 @@
boost::shared_ptr<HandlerFactory_> handlerFactory_;
};
+struct TConnectionInfo {
+ // The input and output protocols
+ boost::shared_ptr<protocol::TProtocol> input;
+ boost::shared_ptr<protocol::TProtocol> output;
+ // The underlying transport used for the connection
+ // This is the transport that was returned by TServerTransport::accept(),
+ // and it may be different than the transport pointed to by the input and
+ // output protocols.
+ boost::shared_ptr<transport::TTransport> transport;
+};
+
+class TProcessorFactory {
+ public:
+ virtual ~TProcessorFactory() {}
+
+ /**
+ * Get the TProcessor to use for a particular connection.
+ *
+ * This method is always invoked in the same thread that the connection was
+ * accepted on. This generally means that this call does not need to be
+ * thread safe, as it will always be invoked from a single thread.
+ */
+ virtual boost::shared_ptr<TProcessor> getProcessor(
+ const TConnectionInfo& connInfo) = 0;
+};
+
+class TSingletonProcessorFactory : public TProcessorFactory {
+ public:
+ TSingletonProcessorFactory(boost::shared_ptr<TProcessor> processor) :
+ processor_(processor) {}
+
+ boost::shared_ptr<TProcessor> getProcessor(const TConnectionInfo&) {
+ return processor_;
+ }
+
+ private:
+ boost::shared_ptr<TProcessor> processor_;
+};
+
}} // apache::thrift
#endif // #ifndef _THRIFT_TPROCESSOR_H_
diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp
index b817260..8d72a15 100644
--- a/lib/cpp/src/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/server/TNonblockingServer.cpp
@@ -88,6 +88,9 @@
/// Server handle
TNonblockingServer* server_;
+ /// TProcessor
+ boost::shared_ptr<TProcessor> processor_;
+
/// Object wrapping network socket
boost::shared_ptr<TSocket> tSocket_;
@@ -420,6 +423,9 @@
} else {
connectionContext_ = NULL;
}
+
+ // Get the processor
+ processor_ = s->getProcessor(inputProtocol_, outputProtocol_, tSocket_);
}
void TNonblockingServer::TConnection::workSocket() {
@@ -572,7 +578,7 @@
// Create task and dispatch to the thread manager
boost::shared_ptr<Runnable> task =
- boost::shared_ptr<Runnable>(new Task(server_->getProcessor(),
+ boost::shared_ptr<Runnable>(new Task(processor_,
inputProtocol_,
outputProtocol_,
this));
@@ -595,8 +601,8 @@
} else {
try {
// Invoke the processor
- server_->getProcessor()->process(inputProtocol_, outputProtocol_,
- connectionContext_);
+ processor_->process(inputProtocol_, outputProtocol_,
+ connectionContext_);
} catch (const TTransportException &ttx) {
GlobalOutput.printf("TNonblockingServer transport error in "
"process(): %s", ttx.what());
diff --git a/lib/cpp/src/server/TServer.h b/lib/cpp/src/server/TServer.h
index 4dddfea..6bd1398 100644
--- a/lib/cpp/src/server/TServer.h
+++ b/lib/cpp/src/server/TServer.h
@@ -112,8 +112,8 @@
serve();
}
- boost::shared_ptr<TProcessor> getProcessor() {
- return processor_;
+ boost::shared_ptr<TProcessorFactory> getProcessorFactory() {
+ return processorFactory_;
}
boost::shared_ptr<TServerTransport> getServerTransport() {
@@ -142,7 +142,7 @@
protected:
TServer(boost::shared_ptr<TProcessor> processor):
- processor_(processor) {
+ processorFactory_(new TSingletonProcessorFactory(processor)) {
setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
setInputProtocolFactory(boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
@@ -151,7 +151,7 @@
TServer(boost::shared_ptr<TProcessor> processor,
boost::shared_ptr<TServerTransport> serverTransport):
- processor_(processor),
+ processorFactory_(new TSingletonProcessorFactory(processor)),
serverTransport_(serverTransport) {
setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
@@ -163,7 +163,7 @@
boost::shared_ptr<TServerTransport> serverTransport,
boost::shared_ptr<TTransportFactory> transportFactory,
boost::shared_ptr<TProtocolFactory> protocolFactory):
- processor_(processor),
+ processorFactory_(new TSingletonProcessorFactory(processor)),
serverTransport_(serverTransport),
inputTransportFactory_(transportFactory),
outputTransportFactory_(transportFactory),
@@ -176,16 +176,33 @@
boost::shared_ptr<TTransportFactory> outputTransportFactory,
boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
boost::shared_ptr<TProtocolFactory> outputProtocolFactory):
- processor_(processor),
+ processorFactory_(new TSingletonProcessorFactory(processor)),
serverTransport_(serverTransport),
inputTransportFactory_(inputTransportFactory),
outputTransportFactory_(outputTransportFactory),
inputProtocolFactory_(inputProtocolFactory),
outputProtocolFactory_(outputProtocolFactory) {}
+ /**
+ * Get a TProcessor to handle calls on a particular connection.
+ *
+ * This method should only be called once per connection (never once per
+ * call). This allows the TProcessorFactory to return a different processor
+ * for each connection if it desires.
+ */
+ boost::shared_ptr<TProcessor> getProcessor(
+ boost::shared_ptr<TProtocol> inputProtocol,
+ boost::shared_ptr<TProtocol> outputProtocol,
+ boost::shared_ptr<TTransport> transport) {
+ TConnectionInfo connInfo;
+ connInfo.input = inputProtocol;
+ connInfo.output = outputProtocol;
+ connInfo.transport = transport;
+ return processorFactory_->getProcessor(connInfo);
+ }
// Class variables
- boost::shared_ptr<TProcessor> processor_;
+ boost::shared_ptr<TProcessorFactory> processorFactory_;
boost::shared_ptr<TServerTransport> serverTransport_;
boost::shared_ptr<TTransportFactory> inputTransportFactory_;
diff --git a/lib/cpp/src/server/TSimpleServer.cpp b/lib/cpp/src/server/TSimpleServer.cpp
index 229261f..b13b976 100644
--- a/lib/cpp/src/server/TSimpleServer.cpp
+++ b/lib/cpp/src/server/TSimpleServer.cpp
@@ -87,6 +87,10 @@
break;
}
+ // Get the processor
+ shared_ptr<TProcessor> processor = getProcessor(inputProtocol,
+ outputProtocol, client);
+
void* connectionContext = NULL;
if (eventHandler_ != NULL) {
connectionContext = eventHandler_->createContext(inputProtocol, outputProtocol);
@@ -96,8 +100,9 @@
if (eventHandler_ != NULL) {
eventHandler_->processContext(connectionContext, client);
}
- if (!processor_->process(inputProtocol, outputProtocol, connectionContext) ||
- // Peek ahead, is the remote side closed?
+ if (!processor->process(inputProtocol, outputProtocol,
+ connectionContext) ||
+ // Peek ahead, is the remote side closed?
!inputProtocol->getTransport()->peek()) {
break;
}
diff --git a/lib/cpp/src/server/TThreadPoolServer.cpp b/lib/cpp/src/server/TThreadPoolServer.cpp
index 2204076..20183f1 100644
--- a/lib/cpp/src/server/TThreadPoolServer.cpp
+++ b/lib/cpp/src/server/TThreadPoolServer.cpp
@@ -170,8 +170,13 @@
inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
+ shared_ptr<TProcessor> processor = getProcessor(inputProtocol,
+ outputProtocol, client);
+
// Add to threadmanager pool
- threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(*this, processor_, inputProtocol, outputProtocol, client)), timeout_);
+ shared_ptr<TThreadPoolServer::Task> task(new TThreadPoolServer::Task(
+ *this, processor, inputProtocol, outputProtocol, client));
+ threadManager_->add(task, timeout_);
} catch (TTransportException& ttx) {
if (inputTransport != NULL) { inputTransport->close(); }
diff --git a/lib/cpp/src/server/TThreadedServer.cpp b/lib/cpp/src/server/TThreadedServer.cpp
index feeb0e9..43d1df2 100644
--- a/lib/cpp/src/server/TThreadedServer.cpp
+++ b/lib/cpp/src/server/TThreadedServer.cpp
@@ -180,8 +180,11 @@
inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
+ shared_ptr<TProcessor> processor = getProcessor(inputProtocol,
+ outputProtocol, client);
+
TThreadedServer::Task* task = new TThreadedServer::Task(*this,
- processor_,
+ processor,
inputProtocol,
outputProtocol,
client);