-- TBufferedRouterTransport being renamed to TPipedTransport
Summary:
- TBufferedRouterTransport is the most nonsensical name I have ever
heard of
Reviewed By: slee
Test Plan: it compiles
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664979 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/transport/TBufferedRouterTransport.cpp b/lib/cpp/src/transport/TBufferedRouterTransport.cpp
deleted file mode 100644
index 60ab594..0000000
--- a/lib/cpp/src/transport/TBufferedRouterTransport.cpp
+++ /dev/null
@@ -1,76 +0,0 @@
-#include "TBufferedRouterTransport.h"
-#include "Thrift.h"
-using std::string;
-
-namespace facebook { namespace thrift { namespace transport {
-
-uint32_t TBufferedRouterTransport::read(uint8_t* buf, uint32_t len) {
- uint32_t need = len;
-
- // We don't have enough data yet
- if (rLen_-rPos_ < need) {
- // Copy out whatever we have
- if (rLen_-rPos_ > 0) {
- memcpy(buf, rBuf_+rPos_, rLen_-rPos_);
- need -= rLen_-rPos_;
- buf += rLen_-rPos_;
- rPos_ = rLen_;
- }
-
- // Double the size of the underlying buffer if it is full
- if (rLen_ == rBufSize_) {
- rBufSize_ *=2;
- rBuf_ = (uint8_t *)realloc(rBuf_, sizeof(uint8_t) * rBufSize_);
- }
-
- // try to fill up the buffer
- rLen_ += trans_->read(rBuf_+rPos_, rBufSize_ - rPos_);
- }
-
-
- // Hand over whatever we have
- uint32_t give = need;
- if (rLen_-rPos_ < give) {
- give = rLen_-rPos_;
- }
- if (give > 0) {
- memcpy(buf, rBuf_+rPos_, give);
- rPos_ += give;
- need -= give;
- }
-
- return (len - need);
-}
-
-void TBufferedRouterTransport::write(const uint8_t* buf, uint32_t len) {
- if (len == 0) {
- return;
- }
-
- // Make the buffer as big as it needs to be
- if ((len + wLen_) >= wBufSize_) {
- uint32_t newBufSize = wBufSize_*2;
- while ((len + wLen_) >= newBufSize) {
- newBufSize *= 2;
- }
- wBuf_ = (uint8_t *)realloc(wBuf_, sizeof(uint8_t) * newBufSize);
- wBufSize_ = newBufSize;
- }
-
- // Copy into the buffer
- memcpy(wBuf_ + wLen_, buf, len);
- wLen_ += len;
-}
-
-void TBufferedRouterTransport::flush() {
- // Write out any data waiting in the write buffer
- if (wLen_ > 0) {
- trans_->write(wBuf_, wLen_);
- wLen_ = 0;
- }
-
- // Flush the underlying transport
- trans_->flush();
-}
-
-}}} // facebook::thrift::transport
diff --git a/lib/cpp/src/transport/TBufferedRouterTransport.h b/lib/cpp/src/transport/TBufferedRouterTransport.h
deleted file mode 100644
index 0b4577c..0000000
--- a/lib/cpp/src/transport/TBufferedRouterTransport.h
+++ /dev/null
@@ -1,129 +0,0 @@
-#ifndef _THRIFT_TRANSPORT_TBUFFEREDROUTERTRANSPORT_H_
-#define _THRIFT_TRANSPORT_TBUFFEREDROUTERTRANSPORT_H_ 1
-
-#include "TTransport.h"
-#include "Thrift.h"
-#include <string>
-
-#include <boost/shared_ptr.hpp>
-
-namespace facebook { namespace thrift { namespace transport {
-
-using namespace boost;
-
-/**
- * BufferedRouterTransport. Funcationally equivalent to TBufferedTransport
- * but routes the request to another Transport (typical use case is to route
- * the request to TFileTransport to store the request on disk). The
- * underlying buffer expands to a keep a copy of the entire request/response.
- *
- * @author Aditya Agarwal <aditya@facebook.com>
- */
-class TBufferedRouterTransport : public TTransport {
- public:
- TBufferedRouterTransport(shared_ptr<TTransport> trans, shared_ptr<TTransport> rtrans) :
- trans_(trans),
- rtrans_(rtrans),
- rBufSize_(512), rPos_(0), rLen_(0),
- wBufSize_(512), wLen_(0) {
-
- rBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * rBufSize_);
- wBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * wBufSize_);
- }
-
- TBufferedRouterTransport(shared_ptr<TTransport> trans, shared_ptr<TTransport> rtrans, uint32_t sz) :
- trans_(trans),
- rtrans_(rtrans),
- rBufSize_(512), rPos_(0), rLen_(0),
- wBufSize_(sz), wLen_(0) {
-
- rBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * rBufSize_);
- wBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * wBufSize_);
- }
-
- ~TBufferedRouterTransport() {
- free(rBuf_);
- free(wBuf_);
- }
-
- bool isOpen() {
- return trans_->isOpen();
- }
-
- bool peek() {
- if (rPos_ >= rLen_) {
- // Double the size of the underlying buffer if it is full
- if (rLen_ == rBufSize_) {
- rBufSize_ *=2;
- rBuf_ = (uint8_t *)realloc(rBuf_, sizeof(uint8_t) * rBufSize_);
- }
-
- // try to fill up the buffer
- rLen_ += trans_->read(rBuf_+rPos_, rBufSize_ - rPos_);
- }
- return (rLen_ > rPos_);
- }
-
-
- void open() {
- trans_->open();
- }
-
- void close() {
- trans_->close();
- }
-
- uint32_t read(uint8_t* buf, uint32_t len);
-
- void readEnd() {
- rtrans_->write(rBuf_, rLen_);
-
- // reset state
- rLen_ = 0;
- rPos_ = 0;
- }
-
- void write(const uint8_t* buf, uint32_t len);
-
- void flush();
-
- protected:
- shared_ptr<TTransport> trans_;
- shared_ptr<TTransport> rtrans_;
-
- uint8_t* rBuf_;
- uint32_t rBufSize_;
- uint32_t rPos_;
- uint32_t rLen_;
-
- uint8_t* wBuf_;
- uint32_t wBufSize_;
- uint32_t wLen_;
-};
-
-
-/**
- * Wraps a transport into a bufferedRouter instance.
- *
- * @author Aditya Agarwal <aditya@facebook.com>
- */
-class TBufferedRouterTransportFactory : public TTransportFactory {
- public:
- TBufferedRouterTransportFactory(boost::shared_ptr<TTransport> rTrans): rTrans_(rTrans) {}
-
- virtual ~TBufferedRouterTransportFactory() {}
-
- /**
- * Wraps the transport into a buffered one.
- */
- virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) {
- return boost::shared_ptr<TTransport>(new TBufferedRouterTransport(trans, rTrans_));
- }
-
- private:
- boost::shared_ptr<TTransport> rTrans_;
-};
-
-}}} // facebook::thrift::transport
-
-#endif // #ifndef _THRIFT_TRANSPORT_TBUFFEREDROUTERTRANSPORT_H_
diff --git a/lib/cpp/src/transport/TTransportUtils.cpp b/lib/cpp/src/transport/TTransportUtils.cpp
index a885020..54b1f3f 100644
--- a/lib/cpp/src/transport/TTransportUtils.cpp
+++ b/lib/cpp/src/transport/TTransportUtils.cpp
@@ -41,6 +41,7 @@
while ((len-pos) + wLen_ >= wBufSize_) {
uint32_t copy = wBufSize_ - wLen_;
memcpy(wBuf_ + wLen_, buf + pos, copy);
+
transport_->write(wBuf_, wBufSize_);
pos += copy;
wLen_ = 0;
@@ -216,4 +217,73 @@
wPos_ += len;
}
+uint32_t TPipedTransport::read(uint8_t* buf, uint32_t len) {
+ uint32_t need = len;
+
+ // We don't have enough data yet
+ if (rLen_-rPos_ < need) {
+ // Copy out whatever we have
+ if (rLen_-rPos_ > 0) {
+ memcpy(buf, rBuf_+rPos_, rLen_-rPos_);
+ need -= rLen_-rPos_;
+ buf += rLen_-rPos_;
+ rPos_ = rLen_;
+ }
+
+ // Double the size of the underlying buffer if it is full
+ if (rLen_ == rBufSize_) {
+ rBufSize_ *=2;
+ rBuf_ = (uint8_t *)realloc(rBuf_, sizeof(uint8_t) * rBufSize_);
+ }
+
+ // try to fill up the buffer
+ rLen_ += srcTrans_->read(rBuf_+rPos_, rBufSize_ - rPos_);
+ }
+
+
+ // Hand over whatever we have
+ uint32_t give = need;
+ if (rLen_-rPos_ < give) {
+ give = rLen_-rPos_;
+ }
+ if (give > 0) {
+ memcpy(buf, rBuf_+rPos_, give);
+ rPos_ += give;
+ need -= give;
+ }
+
+ return (len - need);
+}
+
+void TPipedTransport::write(const uint8_t* buf, uint32_t len) {
+ if (len == 0) {
+ return;
+ }
+
+ // Make the buffer as big as it needs to be
+ if ((len + wLen_) >= wBufSize_) {
+ uint32_t newBufSize = wBufSize_*2;
+ while ((len + wLen_) >= newBufSize) {
+ newBufSize *= 2;
+ }
+ wBuf_ = (uint8_t *)realloc(wBuf_, sizeof(uint8_t) * newBufSize);
+ wBufSize_ = newBufSize;
+ }
+
+ // Copy into the buffer
+ memcpy(wBuf_ + wLen_, buf, len);
+ wLen_ += len;
+}
+
+void TPipedTransport::flush() {
+ // Write out any data waiting in the write buffer
+ if (wLen_ > 0) {
+ srcTrans_->write(wBuf_, wLen_);
+ wLen_ = 0;
+ }
+
+ // Flush the underlying transport
+ srcTrans_->flush();
+}
+
}}} // facebook::thrift::transport
diff --git a/lib/cpp/src/transport/TTransportUtils.h b/lib/cpp/src/transport/TTransportUtils.h
index 79a137c..06547c7 100644
--- a/lib/cpp/src/transport/TTransportUtils.h
+++ b/lib/cpp/src/transport/TTransportUtils.h
@@ -330,6 +330,147 @@
};
+/**
+ * TPipedTransport. This transport allows piping of a request from one
+ * transport to another either when readEnd() or writeEnd(). The typicAL
+ * use case for this is to log a request or a reply to disk.
+ * The underlying buffer expands to a keep a copy of the entire
+ * request/response.
+ *
+ * @author Aditya Agarwal <aditya@facebook.com>
+ */
+class TPipedTransport : public TTransport {
+ public:
+ TPipedTransport(boost::shared_ptr<TTransport> srcTrans,
+ boost::shared_ptr<TTransport> dstTrans) :
+ srcTrans_(srcTrans),
+ dstTrans_(dstTrans),
+ rBufSize_(512), rPos_(0), rLen_(0),
+ wBufSize_(512), wLen_(0) {
+
+ // default is to to pipe the request when readEnd() is called
+ pipeOnRead_ = true;
+ pipeOnWrite_ = false;
+
+ rBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * rBufSize_);
+ wBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * wBufSize_);
+ }
+
+ TPipedTransport(boost::shared_ptr<TTransport> srcTrans,
+ boost::shared_ptr<TTransport> dstTrans,
+ uint32_t sz) :
+ srcTrans_(srcTrans),
+ dstTrans_(dstTrans),
+ rBufSize_(512), rPos_(0), rLen_(0),
+ wBufSize_(sz), wLen_(0) {
+
+ rBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * rBufSize_);
+ wBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * wBufSize_);
+ }
+
+ ~TPipedTransport() {
+ free(rBuf_);
+ free(wBuf_);
+ }
+
+ bool isOpen() {
+ return srcTrans_->isOpen();
+ }
+
+ bool peek() {
+ if (rPos_ >= rLen_) {
+ // Double the size of the underlying buffer if it is full
+ if (rLen_ == rBufSize_) {
+ rBufSize_ *=2;
+ rBuf_ = (uint8_t *)realloc(rBuf_, sizeof(uint8_t) * rBufSize_);
+ }
+
+ // try to fill up the buffer
+ rLen_ += srcTrans_->read(rBuf_+rPos_, rBufSize_ - rPos_);
+ }
+ return (rLen_ > rPos_);
+ }
+
+
+ void open() {
+ srcTrans_->open();
+ }
+
+ void close() {
+ srcTrans_->close();
+ }
+
+ void setPipeOnRead(bool pipeVal) {
+ pipeOnRead_ = pipeVal;
+ }
+
+ void setPipeOnWrite(bool pipeVal) {
+ pipeOnWrite_ = pipeVal;
+ }
+
+ uint32_t read(uint8_t* buf, uint32_t len);
+
+ void readEnd() {
+ if (pipeOnRead_) {
+ dstTrans_->write(rBuf_, rLen_);
+ }
+
+ // reset state
+ rLen_ = 0;
+ rPos_ = 0;
+ }
+
+ void write(const uint8_t* buf, uint32_t len);
+
+ void writeEnd() {
+ if (pipeOnWrite_) {
+ dstTrans_->write(wBuf_, wLen_);
+ }
+ }
+
+ void flush();
+
+ protected:
+ boost::shared_ptr<TTransport> srcTrans_;
+ boost::shared_ptr<TTransport> dstTrans_;
+
+ uint8_t* rBuf_;
+ uint32_t rBufSize_;
+ uint32_t rPos_;
+ uint32_t rLen_;
+
+ uint8_t* wBuf_;
+ uint32_t wBufSize_;
+ uint32_t wLen_;
+
+ bool pipeOnRead_;
+ bool pipeOnWrite_;
+};
+
+
+/**
+ * Wraps a transport into a pipedTransport instance.
+ *
+ * @author Aditya Agarwal <aditya@facebook.com>
+ */
+class TPipedTransportFactory : public TTransportFactory {
+ public:
+ TPipedTransportFactory(boost::shared_ptr<TTransport> dstTrans): dstTrans_(dstTrans) {}
+
+ virtual ~TPipedTransportFactory() {}
+
+ /**
+ * Wraps the base transport into a piped transport.
+ */
+ virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> srcTrans) {
+ return boost::shared_ptr<TTransport>(new TPipedTransport(srcTrans, dstTrans_));
+ }
+
+ private:
+ boost::shared_ptr<TTransport> dstTrans_;
+};
+
+
}}} // facebook::thrift::transport