-- TBufferedRouterTransport being renamed to TPipedTransport

Summary:
- TBufferedRouterTransport is the most nonsensical name I have ever
  heard of

Reviewed By: slee

Test Plan: it compiles


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664979 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/transport/TBufferedRouterTransport.cpp b/lib/cpp/src/transport/TBufferedRouterTransport.cpp
deleted file mode 100644
index 60ab594..0000000
--- a/lib/cpp/src/transport/TBufferedRouterTransport.cpp
+++ /dev/null
@@ -1,76 +0,0 @@
-#include "TBufferedRouterTransport.h"
-#include "Thrift.h"
-using std::string;
-
-namespace facebook { namespace thrift { namespace transport { 
-
-uint32_t TBufferedRouterTransport::read(uint8_t* buf, uint32_t len) {
-  uint32_t need = len;
-  
-  // We don't have enough data yet
-  if (rLen_-rPos_ < need) {
-    // Copy out whatever we have
-    if (rLen_-rPos_ > 0) {
-      memcpy(buf, rBuf_+rPos_, rLen_-rPos_);
-      need -= rLen_-rPos_;
-      buf += rLen_-rPos_;
-      rPos_ = rLen_;
-    }
-
-    // Double the size of the underlying buffer if it is full
-    if (rLen_ == rBufSize_) {
-      rBufSize_ *=2;
-      rBuf_ = (uint8_t *)realloc(rBuf_, sizeof(uint8_t) * rBufSize_);
-    }
-    
-    // try to fill up the buffer
-    rLen_ += trans_->read(rBuf_+rPos_, rBufSize_ - rPos_);
-  }
-
-
-  // Hand over whatever we have
-  uint32_t give = need;
-  if (rLen_-rPos_ < give) {
-    give = rLen_-rPos_;
-  }
-  if (give > 0) {
-    memcpy(buf, rBuf_+rPos_, give);
-    rPos_ += give;
-    need -= give;
-  }
-
-  return (len - need);
-}
-
-void TBufferedRouterTransport::write(const uint8_t* buf, uint32_t len) {
-  if (len == 0) {
-    return;
-  }
-
-  // Make the buffer as big as it needs to be
-  if ((len + wLen_) >= wBufSize_) {
-    uint32_t newBufSize = wBufSize_*2;
-    while ((len + wLen_) >= newBufSize) {
-      newBufSize *= 2;
-    }
-    wBuf_ = (uint8_t *)realloc(wBuf_, sizeof(uint8_t) * newBufSize);
-    wBufSize_ = newBufSize;
-  }
-
-  // Copy into the buffer
-  memcpy(wBuf_ + wLen_, buf, len);
-  wLen_ += len;
-}
-
-void TBufferedRouterTransport::flush()  {
-  // Write out any data waiting in the write buffer
-  if (wLen_ > 0) {
-    trans_->write(wBuf_, wLen_);
-    wLen_ = 0;
-  }
-
-  // Flush the underlying transport
-  trans_->flush();
-}
-
-}}} // facebook::thrift::transport
diff --git a/lib/cpp/src/transport/TBufferedRouterTransport.h b/lib/cpp/src/transport/TBufferedRouterTransport.h
deleted file mode 100644
index 0b4577c..0000000
--- a/lib/cpp/src/transport/TBufferedRouterTransport.h
+++ /dev/null
@@ -1,129 +0,0 @@
-#ifndef _THRIFT_TRANSPORT_TBUFFEREDROUTERTRANSPORT_H_
-#define _THRIFT_TRANSPORT_TBUFFEREDROUTERTRANSPORT_H_ 1
-
-#include "TTransport.h"
-#include "Thrift.h"
-#include <string>
-
-#include <boost/shared_ptr.hpp>
-
-namespace facebook { namespace thrift { namespace transport { 
-                         
-using namespace boost;
-
-/**
- * BufferedRouterTransport. Funcationally equivalent to TBufferedTransport
- * but routes the request to another Transport (typical use case is to route
- * the request to TFileTransport to store the request on disk). The
- * underlying buffer expands to a keep a copy of the entire request/response.
- *
- * @author Aditya Agarwal <aditya@facebook.com>
- */
-class TBufferedRouterTransport : public TTransport {
- public:
-  TBufferedRouterTransport(shared_ptr<TTransport> trans, shared_ptr<TTransport> rtrans) :
-    trans_(trans),
-    rtrans_(rtrans),
-    rBufSize_(512), rPos_(0), rLen_(0),
-    wBufSize_(512), wLen_(0) {
-
-    rBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * rBufSize_);
-    wBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * wBufSize_);
-  }
-    
-  TBufferedRouterTransport(shared_ptr<TTransport> trans, shared_ptr<TTransport> rtrans, uint32_t sz) :
-    trans_(trans),
-    rtrans_(rtrans),
-    rBufSize_(512), rPos_(0), rLen_(0),
-    wBufSize_(sz), wLen_(0) {
-
-    rBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * rBufSize_);
-    wBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * wBufSize_);
-  }
-
-  ~TBufferedRouterTransport() {
-    free(rBuf_);
-    free(wBuf_);
-  }
-
-  bool isOpen() {
-    return trans_->isOpen();
-  }
-  
-  bool peek() {    
-    if (rPos_ >= rLen_) {
-      // Double the size of the underlying buffer if it is full
-      if (rLen_ == rBufSize_) {
-        rBufSize_ *=2;
-        rBuf_ = (uint8_t *)realloc(rBuf_, sizeof(uint8_t) * rBufSize_);
-      }
-    
-      // try to fill up the buffer
-      rLen_ += trans_->read(rBuf_+rPos_, rBufSize_ - rPos_);
-    }
-    return (rLen_ > rPos_);
-  }
-
-
-  void open() {
-    trans_->open();
-  }
-
-  void close() {
-    trans_->close();
-  }
-  
-  uint32_t read(uint8_t* buf, uint32_t len);
-
-  void readEnd() {
-    rtrans_->write(rBuf_, rLen_);
-
-    // reset state
-    rLen_ = 0;
-    rPos_ = 0;
-  }
-
-  void write(const uint8_t* buf, uint32_t len);
-
-  void flush();
-
- protected:
-  shared_ptr<TTransport> trans_;
-  shared_ptr<TTransport> rtrans_;
-
-  uint8_t* rBuf_;
-  uint32_t rBufSize_;
-  uint32_t rPos_;
-  uint32_t rLen_;
-
-  uint8_t* wBuf_;
-  uint32_t wBufSize_;
-  uint32_t wLen_;
-};
-
-
-/**
- * Wraps a transport into a bufferedRouter instance.
- *
- * @author Aditya Agarwal <aditya@facebook.com>
- */
-class TBufferedRouterTransportFactory : public TTransportFactory {
- public:
-  TBufferedRouterTransportFactory(boost::shared_ptr<TTransport> rTrans): rTrans_(rTrans) {}
-
-  virtual ~TBufferedRouterTransportFactory() {}
-
-  /**
-   * Wraps the transport into a buffered one.
-   */
-  virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) {
-    return boost::shared_ptr<TTransport>(new TBufferedRouterTransport(trans, rTrans_));
-  }
-
- private:
-  boost::shared_ptr<TTransport> rTrans_;
-};
-
-}}} // facebook::thrift::transport
-
-#endif // #ifndef _THRIFT_TRANSPORT_TBUFFEREDROUTERTRANSPORT_H_
diff --git a/lib/cpp/src/transport/TTransportUtils.cpp b/lib/cpp/src/transport/TTransportUtils.cpp
index a885020..54b1f3f 100644
--- a/lib/cpp/src/transport/TTransportUtils.cpp
+++ b/lib/cpp/src/transport/TTransportUtils.cpp
@@ -41,6 +41,7 @@
   while ((len-pos) + wLen_ >= wBufSize_) {
     uint32_t copy = wBufSize_ - wLen_;
     memcpy(wBuf_ + wLen_, buf + pos, copy);
+
     transport_->write(wBuf_, wBufSize_);
     pos += copy;
     wLen_ = 0;
@@ -216,4 +217,73 @@
   wPos_ += len;
 }
 
+uint32_t TPipedTransport::read(uint8_t* buf, uint32_t len) {
+  uint32_t need = len;
+  
+  // We don't have enough data yet
+  if (rLen_-rPos_ < need) {
+    // Copy out whatever we have
+    if (rLen_-rPos_ > 0) {
+      memcpy(buf, rBuf_+rPos_, rLen_-rPos_);
+      need -= rLen_-rPos_;
+      buf += rLen_-rPos_;
+      rPos_ = rLen_;
+    }
+
+    // Double the size of the underlying buffer if it is full
+    if (rLen_ == rBufSize_) {
+      rBufSize_ *=2;
+      rBuf_ = (uint8_t *)realloc(rBuf_, sizeof(uint8_t) * rBufSize_);
+    }
+    
+    // try to fill up the buffer
+    rLen_ += srcTrans_->read(rBuf_+rPos_, rBufSize_ - rPos_);
+  }
+
+
+  // Hand over whatever we have
+  uint32_t give = need;
+  if (rLen_-rPos_ < give) {
+    give = rLen_-rPos_;
+  }
+  if (give > 0) {
+    memcpy(buf, rBuf_+rPos_, give);
+    rPos_ += give;
+    need -= give;
+  }
+
+  return (len - need);
+}
+
+void TPipedTransport::write(const uint8_t* buf, uint32_t len) {
+  if (len == 0) {
+    return;
+  }
+
+  // Make the buffer as big as it needs to be
+  if ((len + wLen_) >= wBufSize_) {
+    uint32_t newBufSize = wBufSize_*2;
+    while ((len + wLen_) >= newBufSize) {
+      newBufSize *= 2;
+    }
+    wBuf_ = (uint8_t *)realloc(wBuf_, sizeof(uint8_t) * newBufSize);
+    wBufSize_ = newBufSize;
+  }
+
+  // Copy into the buffer
+  memcpy(wBuf_ + wLen_, buf, len);
+  wLen_ += len;
+}
+
+void TPipedTransport::flush()  {
+  // Write out any data waiting in the write buffer
+  if (wLen_ > 0) {
+    srcTrans_->write(wBuf_, wLen_);
+    wLen_ = 0;
+  }
+
+  // Flush the underlying transport
+  srcTrans_->flush();
+}
+
 }}} // facebook::thrift::transport
diff --git a/lib/cpp/src/transport/TTransportUtils.h b/lib/cpp/src/transport/TTransportUtils.h
index 79a137c..06547c7 100644
--- a/lib/cpp/src/transport/TTransportUtils.h
+++ b/lib/cpp/src/transport/TTransportUtils.h
@@ -330,6 +330,147 @@
 
 };
 
+/**
+ * TPipedTransport. This transport allows piping of a request from one 
+ * transport to another either when readEnd() or writeEnd(). The typicAL
+ * use case for this is to log a request or a reply to disk.
+ * The underlying buffer expands to a keep a copy of the entire 
+ * request/response.
+ *
+ * @author Aditya Agarwal <aditya@facebook.com>
+ */
+class TPipedTransport : public TTransport {
+ public:
+  TPipedTransport(boost::shared_ptr<TTransport> srcTrans, 
+                  boost::shared_ptr<TTransport> dstTrans) :
+    srcTrans_(srcTrans),
+    dstTrans_(dstTrans),
+    rBufSize_(512), rPos_(0), rLen_(0),
+    wBufSize_(512), wLen_(0) {
+
+    // default is to to pipe the request when readEnd() is called
+    pipeOnRead_ = true;
+    pipeOnWrite_ = false; 
+
+    rBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * rBufSize_);
+    wBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * wBufSize_);
+  }
+    
+  TPipedTransport(boost::shared_ptr<TTransport> srcTrans, 
+                  boost::shared_ptr<TTransport> dstTrans, 
+                  uint32_t sz) :
+    srcTrans_(srcTrans),
+    dstTrans_(dstTrans),
+    rBufSize_(512), rPos_(0), rLen_(0),
+    wBufSize_(sz), wLen_(0) {
+
+    rBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * rBufSize_);
+    wBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * wBufSize_);
+  }
+
+  ~TPipedTransport() {
+    free(rBuf_);
+    free(wBuf_);
+  }
+
+  bool isOpen() {
+    return srcTrans_->isOpen();
+  }
+  
+  bool peek() {    
+    if (rPos_ >= rLen_) {
+      // Double the size of the underlying buffer if it is full
+      if (rLen_ == rBufSize_) {
+        rBufSize_ *=2;
+        rBuf_ = (uint8_t *)realloc(rBuf_, sizeof(uint8_t) * rBufSize_);
+      }
+    
+      // try to fill up the buffer
+      rLen_ += srcTrans_->read(rBuf_+rPos_, rBufSize_ - rPos_);
+    }
+    return (rLen_ > rPos_);
+  }
+
+
+  void open() {
+    srcTrans_->open();
+  }
+
+  void close() {
+    srcTrans_->close();
+  }
+
+  void setPipeOnRead(bool pipeVal) {
+    pipeOnRead_ = pipeVal;
+  }
+
+  void setPipeOnWrite(bool pipeVal) {
+    pipeOnWrite_ = pipeVal;
+  }
+  
+  uint32_t read(uint8_t* buf, uint32_t len);
+
+  void readEnd() {
+    if (pipeOnRead_) {
+      dstTrans_->write(rBuf_, rLen_);
+    }
+
+    // reset state
+    rLen_ = 0;
+    rPos_ = 0;
+  }
+
+  void write(const uint8_t* buf, uint32_t len);
+
+  void writeEnd() {
+    if (pipeOnWrite_) {
+      dstTrans_->write(wBuf_, wLen_);
+    }
+  }
+
+  void flush();
+
+ protected:
+  boost::shared_ptr<TTransport> srcTrans_;
+  boost::shared_ptr<TTransport> dstTrans_;
+
+  uint8_t* rBuf_;
+  uint32_t rBufSize_;
+  uint32_t rPos_;
+  uint32_t rLen_;
+
+  uint8_t* wBuf_;
+  uint32_t wBufSize_;
+  uint32_t wLen_;
+
+  bool pipeOnRead_;
+  bool pipeOnWrite_;
+};
+
+
+/**
+ * Wraps a transport into a pipedTransport instance.
+ *
+ * @author Aditya Agarwal <aditya@facebook.com>
+ */
+class TPipedTransportFactory : public TTransportFactory {
+ public:
+  TPipedTransportFactory(boost::shared_ptr<TTransport> dstTrans): dstTrans_(dstTrans) {}
+
+  virtual ~TPipedTransportFactory() {}
+
+  /**
+   * Wraps the base transport into a piped transport.
+   */
+  virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> srcTrans) {
+    return boost::shared_ptr<TTransport>(new TPipedTransport(srcTrans, dstTrans_));
+  }
+
+ private:
+  boost::shared_ptr<TTransport> dstTrans_;
+};
+
+
 }}} // facebook::thrift::transport