Thrift TTransportFactory model for servers

Summary: Servers need to create bufferedtransports etc. around the transports they get in a user-definable way. So use a factory pattern to allow the user to supply an object to the server that defines this behavior.


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664792 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/server/TThreadPoolServer.cc b/lib/cpp/src/server/TThreadPoolServer.cc
index d53d174..1eab53d 100644
--- a/lib/cpp/src/server/TThreadPoolServer.cc
+++ b/lib/cpp/src/server/TThreadPoolServer.cc
@@ -1,5 +1,4 @@
 #include "server/TThreadPoolServer.h"
-#include "transport/TBufferedTransport.h"
 #include "transport/TTransportException.h"
 #include "concurrency/Thread.h"
 #include "concurrency/ThreadManager.h"
@@ -15,54 +14,52 @@
 class TThreadPoolServer::Task: public Runnable {
     
   shared_ptr<TProcessor> _processor;
-  shared_ptr<TTransport> _transport;
-  shared_ptr<TBufferedTransport> _bufferedTransport;
+  shared_ptr<TTransport> _input;
+  shared_ptr<TTransport> _output;
     
 public:
     
   Task(shared_ptr<TProcessor> processor,
-       shared_ptr<TTransport> transport) :
+       shared_ptr<TTransport> input,
+       shared_ptr<TTransport> output) :
     _processor(processor),
-    _transport(transport),
-    _bufferedTransport(new TBufferedTransport(transport)) {
+    _input(input),
+    _output(output) {
   }
 
   ~Task() {}
     
-  void run() {
-      
+  void run() {     
     while(true) {
-	
       try {
-	_processor->process(_bufferedTransport);
-	
+	_processor->process(_input, _output);
       } catch (TTransportException& ttx) {
-	
-	break;
-	
+        break;
       } catch(...) {
-	
-	break;
+        break;
       }
     }
-    
-    _bufferedTransport->close();
+    _input->close();
+    _output->close();
   }
 };
   
 TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
-				     shared_ptr<TServerOptions> options,
-				     shared_ptr<TServerTransport> serverTransport,
-				     shared_ptr<ThreadManager> threadManager) :
-  TServer(processor, options), 
-  serverTransport_(serverTransport), 
+                                     shared_ptr<TServerTransport> serverTransport,
+                                     shared_ptr<TTransportFactory> transportFactory,
+                                     shared_ptr<ThreadManager> threadManager,
+                                     shared_ptr<TServerOptions> options) :
+  TServer(processor, serverTransport, transportFactory, options), 
   threadManager_(threadManager) {
 }
-    
+
 TThreadPoolServer::~TThreadPoolServer() {}
 
 void TThreadPoolServer::run() {
 
+  shared_ptr<TTransport> client;
+  pair<shared_ptr<TTransport>,shared_ptr<TTransport> > io;
+
   try {
     // Start the server listening
     serverTransport_->listen();
@@ -71,15 +68,14 @@
     return;
   }
   
-  // Fetch client from server
-  
-  while (true) {
-    
+  while (true) {   
     try {
-      
-      threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(processor_, 
-											  shared_ptr<TTransport>(serverTransport_->accept()))));
-      
+      // Fetch client from server
+      client = serverTransport_->accept();
+      // Make IO transports
+      io = transportFactory_->getIOTransports(client);
+      // Add to threadmanager pool
+      threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(processor_, io.first, io.second)));
     } catch (TTransportException& ttx) {
       break;
     }