THRIFT-1314. cpp: add TProcessorFactory

Patch: Adam Simpkins

git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1164190 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp
index b817260..8d72a15 100644
--- a/lib/cpp/src/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/server/TNonblockingServer.cpp
@@ -88,6 +88,9 @@
   /// Server handle
   TNonblockingServer* server_;
 
+  /// TProcessor
+  boost::shared_ptr<TProcessor> processor_;
+
   /// Object wrapping network socket
   boost::shared_ptr<TSocket> tSocket_;
 
@@ -420,6 +423,9 @@
   } else {
     connectionContext_ = NULL;
   }
+
+  // Get the processor
+  processor_ = s->getProcessor(inputProtocol_, outputProtocol_, tSocket_);
 }
 
 void TNonblockingServer::TConnection::workSocket() {
@@ -572,7 +578,7 @@
 
       // Create task and dispatch to the thread manager
       boost::shared_ptr<Runnable> task =
-        boost::shared_ptr<Runnable>(new Task(server_->getProcessor(),
+        boost::shared_ptr<Runnable>(new Task(processor_,
                                              inputProtocol_,
                                              outputProtocol_,
                                              this));
@@ -595,8 +601,8 @@
     } else {
       try {
         // Invoke the processor
-        server_->getProcessor()->process(inputProtocol_, outputProtocol_,
-                                         connectionContext_);
+        processor_->process(inputProtocol_, outputProtocol_,
+                            connectionContext_);
       } catch (const TTransportException &ttx) {
         GlobalOutput.printf("TNonblockingServer transport error in "
                             "process(): %s", ttx.what());
diff --git a/lib/cpp/src/server/TServer.h b/lib/cpp/src/server/TServer.h
index 4dddfea..6bd1398 100644
--- a/lib/cpp/src/server/TServer.h
+++ b/lib/cpp/src/server/TServer.h
@@ -112,8 +112,8 @@
     serve();
   }
 
-  boost::shared_ptr<TProcessor> getProcessor() {
-    return processor_;
+  boost::shared_ptr<TProcessorFactory> getProcessorFactory() {
+    return processorFactory_;
   }
 
   boost::shared_ptr<TServerTransport> getServerTransport() {
@@ -142,7 +142,7 @@
 
 protected:
   TServer(boost::shared_ptr<TProcessor> processor):
-    processor_(processor) {
+    processorFactory_(new TSingletonProcessorFactory(processor)) {
     setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
     setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
     setInputProtocolFactory(boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
@@ -151,7 +151,7 @@
 
   TServer(boost::shared_ptr<TProcessor> processor,
           boost::shared_ptr<TServerTransport> serverTransport):
-    processor_(processor),
+    processorFactory_(new TSingletonProcessorFactory(processor)),
     serverTransport_(serverTransport) {
     setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
     setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
@@ -163,7 +163,7 @@
           boost::shared_ptr<TServerTransport> serverTransport,
           boost::shared_ptr<TTransportFactory> transportFactory,
           boost::shared_ptr<TProtocolFactory> protocolFactory):
-    processor_(processor),
+    processorFactory_(new TSingletonProcessorFactory(processor)),
     serverTransport_(serverTransport),
     inputTransportFactory_(transportFactory),
     outputTransportFactory_(transportFactory),
@@ -176,16 +176,33 @@
           boost::shared_ptr<TTransportFactory> outputTransportFactory,
           boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
           boost::shared_ptr<TProtocolFactory> outputProtocolFactory):
-    processor_(processor),
+    processorFactory_(new TSingletonProcessorFactory(processor)),
     serverTransport_(serverTransport),
     inputTransportFactory_(inputTransportFactory),
     outputTransportFactory_(outputTransportFactory),
     inputProtocolFactory_(inputProtocolFactory),
     outputProtocolFactory_(outputProtocolFactory) {}
 
+  /**
+   * Get a TProcessor to handle calls on a particular connection.
+   *
+   * This method should only be called once per connection (never once per
+   * call).  This allows the TProcessorFactory to return a different processor
+   * for each connection if it desires.
+   */
+  boost::shared_ptr<TProcessor> getProcessor(
+      boost::shared_ptr<TProtocol> inputProtocol,
+      boost::shared_ptr<TProtocol> outputProtocol,
+      boost::shared_ptr<TTransport> transport) {
+    TConnectionInfo connInfo;
+    connInfo.input = inputProtocol;
+    connInfo.output = outputProtocol;
+    connInfo.transport = transport;
+    return processorFactory_->getProcessor(connInfo);
+  }
 
   // Class variables
-  boost::shared_ptr<TProcessor> processor_;
+  boost::shared_ptr<TProcessorFactory> processorFactory_;
   boost::shared_ptr<TServerTransport> serverTransport_;
 
   boost::shared_ptr<TTransportFactory> inputTransportFactory_;
diff --git a/lib/cpp/src/server/TSimpleServer.cpp b/lib/cpp/src/server/TSimpleServer.cpp
index 229261f..b13b976 100644
--- a/lib/cpp/src/server/TSimpleServer.cpp
+++ b/lib/cpp/src/server/TSimpleServer.cpp
@@ -87,6 +87,10 @@
       break;
     }
 
+    // Get the processor
+    shared_ptr<TProcessor> processor = getProcessor(inputProtocol,
+                                                    outputProtocol, client);
+
     void* connectionContext = NULL;
     if (eventHandler_ != NULL) {
       connectionContext = eventHandler_->createContext(inputProtocol, outputProtocol);
@@ -96,8 +100,9 @@
         if (eventHandler_ != NULL) {
           eventHandler_->processContext(connectionContext, client);
         }
-        if (!processor_->process(inputProtocol, outputProtocol, connectionContext) ||
-            // Peek ahead, is the remote side closed?
+        if (!processor->process(inputProtocol, outputProtocol,
+                                connectionContext) ||
+          // Peek ahead, is the remote side closed?
             !inputProtocol->getTransport()->peek()) {
           break;
         }
diff --git a/lib/cpp/src/server/TThreadPoolServer.cpp b/lib/cpp/src/server/TThreadPoolServer.cpp
index 2204076..20183f1 100644
--- a/lib/cpp/src/server/TThreadPoolServer.cpp
+++ b/lib/cpp/src/server/TThreadPoolServer.cpp
@@ -170,8 +170,13 @@
       inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
       outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
 
+      shared_ptr<TProcessor> processor = getProcessor(inputProtocol,
+                                                      outputProtocol, client);
+
       // Add to threadmanager pool
-      threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(*this, processor_, inputProtocol, outputProtocol, client)), timeout_);
+      shared_ptr<TThreadPoolServer::Task> task(new TThreadPoolServer::Task(
+            *this, processor, inputProtocol, outputProtocol, client));
+      threadManager_->add(task, timeout_);
 
     } catch (TTransportException& ttx) {
       if (inputTransport != NULL) { inputTransport->close(); }
diff --git a/lib/cpp/src/server/TThreadedServer.cpp b/lib/cpp/src/server/TThreadedServer.cpp
index feeb0e9..43d1df2 100644
--- a/lib/cpp/src/server/TThreadedServer.cpp
+++ b/lib/cpp/src/server/TThreadedServer.cpp
@@ -180,8 +180,11 @@
       inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
       outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
 
+      shared_ptr<TProcessor> processor = getProcessor(inputProtocol,
+                                                      outputProtocol, client);
+
       TThreadedServer::Task* task = new TThreadedServer::Task(*this,
-                                                              processor_,
+                                                              processor,
                                                               inputProtocol,
                                                               outputProtocol,
                                                               client);