Thrift: Modifications to PeekProcessor to be able to support nested PeekProcessors

Reviewed by: boz

Test Plan: Tested with Falcon


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665100 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/processor/PeekProcessor.cpp b/lib/cpp/src/processor/PeekProcessor.cpp
index d510a5d..21df9db 100644
--- a/lib/cpp/src/processor/PeekProcessor.cpp
+++ b/lib/cpp/src/processor/PeekProcessor.cpp
@@ -1,45 +1,63 @@
 #include "PeekProcessor.h"
 
+using namespace facebook::thrift::transport;
+using namespace facebook::thrift::protocol;
+using namespace facebook::thrift;
+
 namespace facebook { namespace thrift { namespace processor { 
 
 PeekProcessor::PeekProcessor() {
-  memoryBuffer_.reset(new facebook::thrift::transport::TMemoryBuffer());
+  memoryBuffer_.reset(new TMemoryBuffer());
+  targetTransport_ = memoryBuffer_;
 }
 PeekProcessor::~PeekProcessor() {}
 
-void PeekProcessor::initialize(boost::shared_ptr<facebook::thrift::TProcessor> actualProcessor,
-                    boost::shared_ptr<facebook::thrift::protocol::TProtocolFactory> protocolFactory,
-                    boost::shared_ptr<facebook::thrift::transport::TPipedTransportFactory> transportFactory) {
+void PeekProcessor::initialize(boost::shared_ptr<TProcessor> actualProcessor,
+                               boost::shared_ptr<TProtocolFactory> protocolFactory,
+                               boost::shared_ptr<TPipedTransportFactory> transportFactory) {
   actualProcessor_ = actualProcessor;
-  pipedProtocol_ = protocolFactory->getProtocol(memoryBuffer_);
+  pipedProtocol_ = protocolFactory->getProtocol(targetTransport_);
   transportFactory_ = transportFactory;
-  transportFactory_->initializeTargetTransport(memoryBuffer_);
+  transportFactory_->initializeTargetTransport(targetTransport_);
 }
 
-boost::shared_ptr<facebook::thrift::transport::TTransport> PeekProcessor::getPipedTransport(boost::shared_ptr<facebook::thrift::transport::TTransport> in) {
+boost::shared_ptr<TTransport> PeekProcessor::getPipedTransport(boost::shared_ptr<TTransport> in) {
   return transportFactory_->getTransport(in);
 }
 
-bool PeekProcessor::process(boost::shared_ptr<facebook::thrift::protocol::TProtocol> in, 
-                            boost::shared_ptr<facebook::thrift::protocol::TProtocol> out) {
+void PeekProcessor::setTargetTransport(boost::shared_ptr<TTransport> targetTransport) {
+  targetTransport_ = targetTransport;
+  if (boost::dynamic_pointer_cast<TMemoryBuffer>(targetTransport_)) {
+    memoryBuffer_ = boost::dynamic_pointer_cast<TMemoryBuffer>(targetTransport);
+  } else if (boost::dynamic_pointer_cast<TPipedTransport>(targetTransport_)) {
+    memoryBuffer_ = boost::dynamic_pointer_cast<TMemoryBuffer>(boost::dynamic_pointer_cast<TPipedTransport>(targetTransport_)->getTargetTransport());
+  }
+
+  if (!memoryBuffer_) {
+    throw TException("Target transport must be a TMemoryBuffer or a TPipedTransport with TMemoryBuffer");
+  }
+}
+
+bool PeekProcessor::process(boost::shared_ptr<TProtocol> in, 
+                            boost::shared_ptr<TProtocol> out) {
 
   std::string fname;
-  facebook::thrift::protocol::TMessageType mtype;
+  TMessageType mtype;
   int32_t seqid;
   in->readMessageBegin(fname, mtype, seqid);
 
-  if (mtype != facebook::thrift::protocol::T_CALL) {
-    throw facebook::thrift::TException("Unexpected message type");
+  if (mtype != T_CALL) {
+    throw TException("Unexpected message type");
   }
 
   // Peek at the name
   peekName(fname);
 
-  facebook::thrift::protocol::TType ftype;
+  TType ftype;
   int16_t fid;
   while (true) {
     in->readFieldBegin(fname, ftype, fid);
-    if (ftype == facebook::thrift::protocol::T_STOP) {
+    if (ftype == T_STOP) {
       break;
     }
 
@@ -73,9 +91,9 @@
 void PeekProcessor::peekBuffer(uint8_t* buffer, uint32_t size) {
 }
 
-void PeekProcessor::peek(boost::shared_ptr<facebook::thrift::protocol::TProtocol> in, 
-                  facebook::thrift::protocol::TType ftype,
-                  int16_t fid) {
+void PeekProcessor::peek(boost::shared_ptr<TProtocol> in, 
+                         TType ftype,
+                         int16_t fid) {
   in->skip(ftype);
 }
 
diff --git a/lib/cpp/src/processor/PeekProcessor.h b/lib/cpp/src/processor/PeekProcessor.h
index d1d227c..6b8414b 100644
--- a/lib/cpp/src/processor/PeekProcessor.h
+++ b/lib/cpp/src/processor/PeekProcessor.h
@@ -37,6 +37,8 @@
 
   boost::shared_ptr<facebook::thrift::transport::TTransport> getPipedTransport(boost::shared_ptr<facebook::thrift::transport::TTransport> in);
 
+  void setTargetTransport(boost::shared_ptr<facebook::thrift::transport::TTransport> targetTransport);
+
   virtual bool process(boost::shared_ptr<facebook::thrift::protocol::TProtocol> in, 
                        boost::shared_ptr<facebook::thrift::protocol::TProtocol> out);
 
@@ -54,6 +56,7 @@
   boost::shared_ptr<facebook::thrift::protocol::TProtocol> pipedProtocol_;
   boost::shared_ptr<facebook::thrift::transport::TPipedTransportFactory> transportFactory_;
   boost::shared_ptr<facebook::thrift::transport::TMemoryBuffer> memoryBuffer_;
+  boost::shared_ptr<facebook::thrift::transport::TTransport> targetTransport_;
 };
 
 }}} // facebook::thrift::processor
diff --git a/lib/cpp/src/transport/TTransportUtils.h b/lib/cpp/src/transport/TTransportUtils.h
index 653939f..cb9a95f 100644
--- a/lib/cpp/src/transport/TTransportUtils.h
+++ b/lib/cpp/src/transport/TTransportUtils.h
@@ -425,6 +425,7 @@
   void readEnd() {
     if (pipeOnRead_) {
       dstTrans_->write(rBuf_, rLen_);
+      dstTrans_->flush();
     }
 
     // reset state
@@ -437,11 +438,16 @@
   void writeEnd() {
     if (pipeOnWrite_) {
       dstTrans_->write(wBuf_, wLen_);
+      dstTrans_->flush();
     }
   }
 
   void flush();
 
+  boost::shared_ptr<TTransport> getTargetTransport() {
+    return dstTrans_;
+  } 
+
  protected:
   boost::shared_ptr<TTransport> srcTrans_;
   boost::shared_ptr<TTransport> dstTrans_;