-- Nonblocking server changes to allow logging

Summary:
-- the constructor needs to accept a transport factory
-- TConnection close() needs to close factor generated transports

Reviewed By: Mark Slee

Test Plan: Tested with search redologger


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664930 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp
index 9fb5c23..5755514 100644
--- a/lib/cpp/src/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/server/TNonblockingServer.cpp
@@ -9,7 +9,7 @@
 
 namespace facebook { namespace thrift { namespace server { 
 
-void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) {
+  void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) {
   socket_ = socket;
   server_ = s;
   appState_ = APP_INIT;
@@ -27,6 +27,18 @@
   
   // Set flags, which also registers the event
   setFlags(eventFlags);
+
+  // TODO: this needs to be replaced by the new version of TTransportFactory
+  factoryInputTransport_ = (s->getTransportFactory()->getIOTransports(inputTransport_)).first;
+  //  factoryOutputTransport_ = (transportFactory->getIOTransports(outputTransport_)).first;
+
+  // Create protocol
+  std::pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
+  iop = s->getProtocolFactory()->getIOProtocols(factoryInputTransport_ ,
+                                                outputTransport_);
+  inputProtocol_ = iop.first;
+  outputProtocol_ = iop.second;
+
 }
 
 void TConnection::workSocket() {
@@ -152,11 +164,11 @@
       // Invoke the processor
       server_->getProcessor()->process(inputProtocol_, outputProtocol_);
     } catch (TTransportException &ttx) {
-      fprintf(stderr, "Server::process() %s\n", ttx.what());
+      fprintf(stderr, "TTransportException: Server::process() %s\n", ttx.what());
       close();
       return;
     } catch (TException &x) {
-      fprintf(stderr, "Server::process() %s\n", x.what());
+      fprintf(stderr, "TException: Server::process() %s\n", x.what());
       close();     
       return;
     } catch (...) {
@@ -339,6 +351,10 @@
   }
   socket_ = 0;
 
+  // close any factory produced transports
+  factoryInputTransport_->close();
+  //  factoryOutputTransport_->close();
+
   // Give this object back to the server that owns it
   server_->returnConnection(this);
 }
@@ -350,7 +366,7 @@
 TConnection* TNonblockingServer::createConnection(int socket, short flags) {
   // Check the stack
   if (connectionStack_.empty()) {
-    return new TConnection(socket, flags, this);
+    return new TConnection(socket, flags, this, this->getTransportFactory());
   } else {
     TConnection* result = connectionStack_.top();
     connectionStack_.pop();
diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h
index 11b58b1..c8bfcb5 100644
--- a/lib/cpp/src/server/TNonblockingServer.h
+++ b/lib/cpp/src/server/TNonblockingServer.h
@@ -50,12 +50,23 @@
   void handleEvent(int fd, short which);
 
  public:
-  TNonblockingServer(shared_ptr<TProcessor> processor, int port) :
-    TServer(processor),
+  TNonblockingServer(shared_ptr<TProcessor> processor, 
+                     shared_ptr<TProtocolFactory> protocolFactory,
+                     int port) :
+    TServer(processor, protocolFactory),
     serverSocket_(0),
     port_(port),
     frameResponses_(true) {}
-    
+
+  TNonblockingServer(shared_ptr<TProcessor> processor, 
+                     shared_ptr<TProtocolFactory>  protocolFactory,
+                     shared_ptr<TTransportFactory> transportFactory,
+                     int port) :
+    TServer(processor, protocolFactory, transportFactory),
+    serverSocket_(0),
+    port_(port),
+    frameResponses_(true) {}
+        
   ~TNonblockingServer() {}
 
   void setFrameResponses(bool frameResponses) {
@@ -155,6 +166,10 @@
   // Transport that processor writes to
   shared_ptr<TMemoryBuffer> outputTransport_;
 
+  // extra transport generated by transport factory (e.g. BufferedRouterTransport)
+  shared_ptr<TTransport> factoryInputTransport_;
+  //  shared_ptr<TTransport> factoryOutputTransport_;
+
   // Protocol encoder
   shared_ptr<TProtocol> outputProtocol_;
 
@@ -183,7 +198,8 @@
  public:
 
   // Constructor
-  TConnection(int socket, short eventFlags, TNonblockingServer *s) {
+  TConnection(int socket, short eventFlags, TNonblockingServer *s, 
+              shared_ptr<TTransportFactory> transportFactory) {
     readBuffer_ = (uint8_t*)malloc(1024);
     if (readBuffer_ == NULL) {
       throw new facebook::thrift::TException("Out of memory.");
@@ -191,15 +207,11 @@
     readBufferSize_ = 1024;
     
     // Allocate input and output tranpsorts
+    // these only need to be allocated once per TConnection (they don't need to be 
+    // reallocated on init() call)
     inputTransport_ = shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
     outputTransport_ = shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
-    
-    // Create protocol
-    std::pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
-    iop = s->getProtocolFactory()->getIOProtocols(inputTransport_, outputTransport_);
-    inputProtocol_ = iop.first;
-    outputProtocol_ = iop.second;
-
+        
     init(socket, eventFlags, s);
   }
 
diff --git a/lib/cpp/src/server/TServer.h b/lib/cpp/src/server/TServer.h
index 4b20432..b9f4fca 100644
--- a/lib/cpp/src/server/TServer.h
+++ b/lib/cpp/src/server/TServer.h
@@ -33,11 +33,20 @@
   shared_ptr<TProcessor> getProcessor() {
     return processor_;
   }
+
+  shared_ptr<TServerTransport> getServerTransport() {
+    return serverTransport_;
+  }
+
+  shared_ptr<TTransportFactory> getTransportFactory() {
+    return transportFactory_;
+  }
   
   shared_ptr<TProtocolFactory> getProtocolFactory() {
     return protocolFactory_;
   }
 
+
 protected:
   TServer(shared_ptr<TProcessor> processor,
           shared_ptr<TServerTransport> serverTransport,
@@ -70,6 +79,27 @@
     transportFactory_ = boost::shared_ptr<TTransportFactory>(new TTransportFactory());
     protocolFactory_ = boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory());
   }
+
+  TServer(shared_ptr<TProcessor> processor, 
+          shared_ptr<TTransportFactory> transportFactory) :
+    processor_(processor),
+    transportFactory_(transportFactory) {
+    protocolFactory_ = boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory());
+  }
+
+  TServer(shared_ptr<TProcessor> processor, 
+          shared_ptr<TProtocolFactory> protocolFactory) :
+    processor_(processor) {
+    transportFactory_ = boost::shared_ptr<TTransportFactory>(new TTransportFactory());
+    protocolFactory_ = protocolFactory;
+  }
+
+  TServer(shared_ptr<TProcessor>        processor,           
+          shared_ptr<TProtocolFactory>  protocolFactory,
+          shared_ptr<TTransportFactory> transportFactory):
+    processor_(processor),
+    transportFactory_(transportFactory),
+    protocolFactory_(protocolFactory) {}
  
   shared_ptr<TProcessor> processor_;
   shared_ptr<TServerTransport> serverTransport_;