-- Thrift Log File

Summary:
-- This is the thrifty version of Pillar's batch_writer
-- Cleaned up a lot of the code in batch writer and made it conform to Thrift's strict coding standards
-- Added TBufferedRouterTransport.h/cc to actually route messsages via readEnd() to the file writer. It's
   not quite as easy to route the messages in Thrift as it was in Pillar

Reviewed By: Slee

Test Plan: Tested by making sure that the file was recording data

Notes:
-- The real correctness test will be when I finish writing TLogFileTransport (pillar_logfile.cpp).


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664826 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am
index 2cb4759..a92de3e 100644
--- a/lib/cpp/Makefile.am
+++ b/lib/cpp/Makefile.am
@@ -11,6 +11,8 @@
                     src/concurrency/TimerManager.cc \
 	            src/protocol/TBinaryProtocol.cc \
                     src/transport/TBufferedTransport.cc \
+                    src/transport/TBufferedFileWriter.cc \
+                    src/transport/TBufferedRouterTransport.cc \
                     src/transport/TFramedTransport.cc \
                     src/transport/TMemoryBuffer.cc \
                     src/transport/TSocket.cc \
@@ -30,7 +32,8 @@
 include_thrift_HEADERS = \
 			 config.h \
 			 src/Thrift.h \
-			 src/TProcessor.h
+			 src/TProcessor.h \
+			 src/TLogging.h
 
 include_concurrencydir = $(include_thriftdir)/concurrency
 include_concurrency_HEADERS = \
@@ -59,7 +62,9 @@
                          src/transport/TTransport.h \
                          src/transport/TTransportException.h \
                          src/transport/TTransportFactory.h \
-                         src/transport/TBufferedTransportFactory.h
+                         src/transport/TBufferedTransportFactory.h \
+                         src/transport/TBufferedFileWriter.h \
+                         src/transport/TBufferedRouterTransport.h
 
 include_serverdir = $(include_thriftdir)/server
 include_server_HEADERS = \
diff --git a/lib/cpp/src/transport/TBufferedFileWriter.cc b/lib/cpp/src/transport/TBufferedFileWriter.cc
new file mode 100644
index 0000000..ac46b97
--- /dev/null
+++ b/lib/cpp/src/transport/TBufferedFileWriter.cc
@@ -0,0 +1,348 @@
+#include "TBufferedFileWriter.h"
+
+#include <pthread.h>
+#include <cassert>
+#include <cstdlib>
+#include <string>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <fcntl.h>
+#include <errno.h>
+
+using std::string;
+
+namespace facebook { namespace thrift { namespace transport { 
+
+TBufferedFileWriter::TBufferedFileWriter(string filename, uint32_t sz) {
+  init(filename, sz, 0, 0);
+}
+
+TBufferedFileWriter::TBufferedFileWriter(string filename, uint32_t sz, int fd, long long offset) {
+  init(filename, sz, fd, offset);
+}
+
+void TBufferedFileWriter::init(string filename, uint32_t sz, int fd, long long offset) {
+  // validate buffer size
+  sz_ = sz;
+  if (sz_ <= 0) {
+    throw TTransportException("invalid input buffer size");
+  }
+
+  // set file-related variables
+  fd_ = 0;
+  resetOutputFile(fd, filename, offset);
+
+  // set default values of flush related params
+  flushMaxBytes_ = 1024 * 100;
+  flushMaxUs_ = 20 * 1000;
+
+  // allocate event buffer
+  buffer_ = new eventInfo[sz_];
+
+  // buffer is initially empty
+  isEmpty_ = true;
+  isFull_  = false;
+
+  // both head and tail are initially at 0
+  headPos_ = 0;
+  tailPos_ = 0;
+
+  // for lack of a better option, set chunk size to 0. Users can change this to whatever they want
+  chunkSize_ = 0;
+
+  // initialize all the condition vars/mutexes
+  pthread_mutex_init(&mutex_, NULL);
+  pthread_cond_init(&notFull_, NULL);
+  pthread_cond_init(&notEmpty_, NULL);
+  pthread_cond_init(&flushed_, NULL);
+
+  // not closing the file during init
+  closing_ = false;
+
+  // spawn writer thread
+  pthread_create(&writer_, NULL, startWriterThread, (void *)this);
+}
+
+void TBufferedFileWriter::resetOutputFile(int fd, string filename, long long offset) {
+  filename_ = filename;
+  offset_ = offset;
+
+  // check if current file is still open
+  if (fd_ > 0) {
+    // TODO: unclear if this should throw an error
+    fprintf(stderr, "error, current file not closed (trying to open %s)\n", filename_.c_str());
+    ::close(fd_);
+  }
+  fd_ = fd;
+}
+
+
+TBufferedFileWriter::~TBufferedFileWriter() {
+  // flush output buffer
+  flush();
+
+  // send a signal to write thread to end
+  closing_ = true;
+  pthread_join(writer_, NULL);
+
+  delete[] buffer_;
+
+  // TODO: should the file be closed here?
+}
+
+
+void TBufferedFileWriter::enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush) {
+  // make sure that event size is valid
+  if ( (maxEventSize_ > 0) && (eventLen > maxEventSize_) ) {
+    //    ERROR("msg size is greater than max event size: %lu > %u\n", eventLen, maxEventSize_);
+    return;
+  }
+
+  if (eventLen == 0) {
+    ERROR("cannot enqueue an empty event");
+    return;
+  }
+
+  eventInfo toEnqueue;
+  uint8_t* bufCopy = (uint8_t *)malloc(sizeof(uint8_t) * eventLen);
+  toEnqueue.payLoad_ = bufCopy;
+  toEnqueue.eventSize_ = eventLen;
+
+  return enqueueEvent(toEnqueue, blockUntilFlush);
+}
+
+void TBufferedFileWriter::enqueueEvent(const eventInfo& toEnqueue, bool blockUntilFlush) {
+  // Lock mutex
+  pthread_mutex_lock(&mutex_);
+  // Can't enqueue while buffer is full
+  while(isFull_) {
+    pthread_cond_wait(&notFull_, &mutex_);
+  }
+
+  // make a copy and enqueue at tail of buffer
+  buffer_[tailPos_] = toEnqueue;
+  tailPos_ = (tailPos_+1) % sz_;
+  
+  // mark the buffer as non-empty
+  isEmpty_ = false;
+  
+  // circular buffer has wrapped around (and is full)
+  if(tailPos_ == headPos_) {
+    //    DEBUG("queue is full");
+    isFull_ = true;
+  }
+
+  // signal anybody who's waiting for the buffer to be non-empty
+  pthread_cond_signal(&notEmpty_);
+  if(blockUntilFlush) {
+    pthread_cond_wait(&flushed_, &mutex_);
+  }
+
+  // TODO: don't return until flushed to disk
+  // this really should be a loop where it makes sure it got flushed
+  // because condition variables can get triggered by the os for no reason 
+  // it is probably a non-factor for the time being
+  pthread_mutex_unlock(&mutex_);
+
+}
+
+eventInfo TBufferedFileWriter::dequeueEvent(long long deadline) {
+  //deadline time struc
+  struct timespec ts;
+  if(deadline) {
+    ts.tv_sec = deadline/(1000*1000);
+    ts.tv_nsec = (deadline%(1000*1000))*1000;
+  }
+
+  // wait for the queue to fill up
+  pthread_mutex_lock(&mutex_);
+  while(isEmpty_) {
+    // do a timed wait on the condition variable
+    if(deadline) {
+      int e = pthread_cond_timedwait(&notEmpty_, &mutex_, &ts);
+      if(e == ETIMEDOUT) {
+        break;
+      }
+    }
+    else {
+      // just wait until the buffer gets an item
+      pthread_cond_wait(&notEmpty_, &mutex_);
+    }
+  }
+
+  string ret;
+  bool doSignal = false;
+
+  // could be empty if we timed out
+  eventInfo retEvent;
+  if(!isEmpty_) {
+    retEvent = buffer_[headPos_];
+    headPos_ = (headPos_+1) % sz_;
+
+    isFull_ = false;
+    doSignal = true;
+
+    // check if this is the last item in the buffer
+    if(headPos_ == tailPos_) {
+      isEmpty_ = true;
+    }
+  }
+
+  // unlock the mutex and signal if required
+  pthread_mutex_unlock(&mutex_);
+  if(doSignal) {
+    pthread_cond_signal(&notFull_);
+  }
+
+  return retEvent;
+}
+
+
+void TBufferedFileWriter::flush()
+{
+  eventInfo flushEvent;
+  flushEvent.payLoad_ = NULL;
+  flushEvent.eventSize_ = 0;
+
+  notFlushed_ = true;
+
+  enqueueEvent(flushEvent, false);
+
+  // wait for flush to take place
+  pthread_mutex_lock(&mutex_);
+
+  while(notFlushed_) {
+    pthread_cond_wait(&flushed_, &mutex_);
+  }
+
+  pthread_mutex_unlock(&mutex_);
+}
+
+void TBufferedFileWriter::openOutputFile() {
+  mode_t mode = S_IRUSR| S_IWUSR| S_IRGRP | S_IROTH;
+  fd_ = ::open(filename_.c_str(), O_WRONLY | O_CREAT | O_APPEND, mode);
+
+  // make sure open call was successful
+  if(fd_ == -1) {
+    char errorMsg[1024];
+    sprintf(errorMsg, "TBufferedFileWriter: Could not open file: %s", filename_.c_str());
+    perror(errorMsg);
+    throw TTransportException(errorMsg);
+  }
+}
+
+uint32_t TBufferedFileWriter::getCurrentTime() {
+  long long ret;
+  struct timeval tv;
+  gettimeofday(&tv, NULL);
+  ret = tv.tv_sec;
+  ret = ret*1000*1000 + tv.tv_usec;
+  return ret;
+}
+
+
+void TBufferedFileWriter::writerThread() {
+  // open file if it is not open
+  if(!fd_) {
+    openOutputFile();
+  }
+
+  // Figure out the next time by which a flush must take place
+  long long nextFlush = getCurrentTime() + flushMaxUs_;
+  uint32_t unflushed = 0;
+
+  while(1) {
+    // this will only be true when the destructor is being invoked
+    if(closing_) {
+      if(-1 == ::close(fd_)) {
+        perror("TBufferedFileWriter: error in close");
+      }
+      throw TTransportException("error in file close");
+    }
+
+    //long long start = now();
+    eventInfo outEvent = dequeueEvent(nextFlush);
+
+    // sanity check on event
+    if ( (maxEventSize_ > 0) && (outEvent.eventSize_ > maxEventSize_)) {
+      ERROR("msg size is greater than max event size: %u > %u\n", outEvent.eventSize_, maxEventSize_);
+      continue;
+    }
+    //long long diff = now()-start;
+    //DEBUG("got a dequeue of size %d after %lld ms\n", (int)s.size(), diff/1000);
+
+    // If chunking is required, then make sure that msg does not cross chunk boundary
+    if( (outEvent.eventSize_ > 0) && (chunkSize_ != 0)) {
+
+      // event size must be less than chunk size
+      if(outEvent.eventSize_ > chunkSize_) {
+        ERROR("TBufferedFileWriter: event size(%u) is greater than chunk size(%u): skipping event",
+              outEvent.eventSize_, chunkSize_);
+        continue;
+      }
+
+      long long chunk1 = offset_/chunkSize_;
+      long long chunk2 = (offset_ + outEvent.eventSize_ - 1)/chunkSize_;
+      
+      // if adding this event will cross a chunk boundary, pad the chunk with zeros
+      if(chunk1 != chunk2) {
+        int padding = (int)(chunk2*chunkSize_ - offset_);
+
+        // sanity check
+        if (padding <= 0) {
+          DEBUG("Padding is empty, skipping event");
+          continue;
+        }
+        if (padding > (int32_t)chunkSize_) {
+          DEBUG("padding is larger than chunk size, skipping event");
+          continue;
+        }
+        //        DEBUG("padding %d zeros to get to chunk %lld\n", padding, chunk2);
+        uint8_t zeros[padding];
+        bzero(zeros, padding);
+        if(-1 == ::write(fd_, zeros, padding)) {
+          perror("TBufferedFileWriter: error while padding zeros");
+          throw TTransportException("TBufferedFileWriter: error while padding zeros");
+        }
+        unflushed += padding;
+        offset_ += padding;
+      }
+    }
+
+    // write the dequeued event to the file
+    if(outEvent.eventSize_ > 0) {
+      if(-1 == ::write(fd_, outEvent.payLoad_, outEvent.eventSize_)) {
+        perror("TBufferedFileWriter: error while writing event");
+        // TODO: should this trigger an exception or simply continue?
+        throw TTransportException("TBufferedFileWriter: error while writing event");
+      }
+
+      // deallocate payload
+      free(outEvent.payLoad_);
+
+      unflushed += outEvent.eventSize_;
+      offset_ += outEvent.eventSize_;
+    }
+
+    // couple of cases from which a flush could be triggered
+    if((getCurrentTime() >= nextFlush && unflushed > 0) ||
+       unflushed > flushMaxBytes_ ||
+       (outEvent.eventSize_ == 0) ) {
+      //Debug("flushing %d bytes to %s (%d %d, full? %d)", unflushed, filename_.c_str(), headPos_, tailPos_, isFull_);
+
+      // sync (force flush) file to disk
+      fsync(fd_);
+      nextFlush = getCurrentTime() + flushMaxUs_;
+      unflushed = 0;
+
+      // notify anybody(thing?) waiting for flush completion
+      pthread_mutex_lock(&mutex_);
+      notFlushed_ = false;
+      pthread_mutex_unlock(&mutex_);
+      pthread_cond_broadcast(&flushed_);
+    }
+  }
+
+}
+
+}}} // facebook::thrift::transport
diff --git a/lib/cpp/src/transport/TBufferedFileWriter.h b/lib/cpp/src/transport/TBufferedFileWriter.h
new file mode 100644
index 0000000..c327aab
--- /dev/null
+++ b/lib/cpp/src/transport/TBufferedFileWriter.h
@@ -0,0 +1,138 @@
+#ifndef _THRIFT_TRANSPORT_TBUFFEREDFILEWRITER_H_
+#define _THRIFT_TRANSPORT_TBUFFEREDFILEWRITER_H_ 1
+
+#include "TTransport.h"
+#include "Thrift.h"
+
+#include <string>
+
+#include <boost/shared_ptr.hpp>
+
+namespace facebook { namespace thrift { namespace transport { 
+
+using namespace boost;
+using std::string;
+
+// Data pertaining to a single event
+typedef struct eventInfo {
+   uint8_t* payLoad_;
+   uint32_t eventSize_;
+
+  eventInfo():payLoad_(NULL), eventSize_(0){};
+} eventInfo;
+
+
+/**
+ * Class that stores a circular in-memory event/message buffer and writes 
+ * elements to disk when the buffer becomes full or a flush is triggered.
+ *
+ * @author Aditya Agarwal <aditya@facebook.com>
+ */
+class TBufferedFileWriter : public TTransport {
+ public:
+  void setFlushMaxUs(uint32_t flushMaxUs) {
+    flushMaxUs_ = flushMaxUs;
+  }
+  uint32_t getFlushMaxUs() {
+    return flushMaxUs_;
+  }
+
+  void setFlushMaxBytes(uint32_t flushMaxBytes) {
+    flushMaxBytes_ = flushMaxBytes;
+  }
+  uint32_t getFlushMaxBytes() {
+    return flushMaxBytes_;
+  }
+
+  void setChunkSize(uint32_t chunkSize) {
+    chunkSize_ = chunkSize;
+  }
+  uint32_t getChunkSize() {
+    return chunkSize_;
+  }
+
+  void setMaxEventSize(uint32_t maxEventSize) {
+    maxEventSize_ = maxEventSize;
+  }
+  uint32_t getMaxEventSize() {
+    return maxEventSize_;
+  }
+
+  TBufferedFileWriter(string filename, uint32_t sz);
+  TBufferedFileWriter(string filename, uint32_t sz, int fd, long long offset);
+  void init(string filename, uint32_t sz, int fd, long long offset);
+  ~TBufferedFileWriter();
+
+  void resetOutputFile(int fd, string filename, long long offset);
+
+  void enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush);
+  void enqueueEvent(const eventInfo& toEnqueue, bool blockUntilFlush);
+  void write(const uint8_t* buf, uint32_t len) {
+    enqueueEvent(buf, len, false);
+  }
+
+  eventInfo dequeueEvent(long long deadline);
+  void flush();
+
+  // control for writer thread
+  static void* startWriterThread(void* ptr) {
+    (((TBufferedFileWriter*)ptr)->writerThread());
+    return 0;
+  }
+  void writerThread();
+
+
+ private:
+  // circular buffer to hold data in before it is flushed. This is an array of strings. Each
+  // element of the array stores a msg that needs to be written to the file
+  eventInfo* buffer_;
+  
+  // size of string buffer
+  uint32_t sz_;
+
+  // size of chunks that file will be split up into
+  uint32_t chunkSize_;
+
+  // max number of microseconds that can pass without flushing
+  uint32_t flushMaxUs_;
+
+  // max number of bytes that can be written without flushing
+  uint32_t flushMaxBytes_;
+
+  // max event size
+  uint32_t maxEventSize_;
+  
+  // writer thread id
+  pthread_t writer_;
+
+  // variables that determine position of head/tail of circular buffer
+  int headPos_, tailPos_;
+
+  // variables indicating whether the buffer is full or empty
+  bool isFull_, isEmpty_;
+  pthread_cond_t notFull_, notEmpty_;
+  bool closing_;
+
+  // To keep track of whether the buffer has been flushed
+  pthread_cond_t flushed_;
+  bool notFlushed_;
+
+  // Mutex that is grabbed when enqueueing, dequeueing and flushing
+  // from the circular buffer
+  pthread_mutex_t mutex_;
+
+  // File information
+  string filename_;
+  int fd_;
+
+  // Offset within the file
+  long long offset_;
+
+  void openOutputFile();
+  uint32_t getCurrentTime();
+
+};
+
+}}}
+
+#endif // _THRIFT_TRANSPORT_TBUFFEREDFILEWRITER_H_
diff --git a/lib/cpp/src/transport/TBufferedRouterTransport.cc b/lib/cpp/src/transport/TBufferedRouterTransport.cc
new file mode 100644
index 0000000..7c09953
--- /dev/null
+++ b/lib/cpp/src/transport/TBufferedRouterTransport.cc
@@ -0,0 +1,79 @@
+#include "TBufferedRouterTransport.h"
+#include "Thrift.h"
+using std::string;
+
+namespace facebook { namespace thrift { namespace transport { 
+
+uint32_t TBufferedRouterTransport::read(uint8_t* buf, uint32_t len) {
+  uint32_t need = len;
+  
+  // We don't have enough data yet
+  if (rLen_-rPos_ < need) {
+    // Copy out whatever we have
+    if (rLen_-rPos_ > 0) {
+      memcpy(buf, rBuf_+rPos_, rLen_-rPos_);
+      need -= rLen_-rPos_;
+      buf += rLen_-rPos_;
+      rPos_ = rLen_;
+    }
+
+    // Double the size of the underlying buffer if it is full
+    if (rLen_ == rBufSize_) {
+      rBufSize_ *=2;
+      rBuf_ = (uint8_t *)realloc(rBuf_, sizeof(uint8_t) * rBufSize_);
+    }
+    
+    // try to fill up the buffer
+    rLen_ += trans_->read(rBuf_+rPos_, rBufSize_ - rPos_);
+  }
+
+  // Hand over whatever we have
+  uint32_t give = need;
+  if (rLen_-rPos_ < give) {
+    give = rLen_-rPos_;
+  }
+  memcpy(buf, rBuf_+rPos_, give);
+  rPos_ += give;
+  need -= give;
+
+  return (len - need);
+}
+
+void TBufferedRouterTransport::write(const uint8_t* buf, uint32_t len) {
+  if (len == 0) {
+    return;
+  }
+
+  if (len + wLen_ >= wBufSize_) {
+    uint32_t copy = wBufSize_ - wLen_;
+    memcpy(wBuf_ + wLen_, buf, copy);
+    trans_->write(wBuf_+wPos_, wBufSize_-wPos_);
+    wLen_ += copy;
+    wPos_ = wLen_;
+
+    uint32_t left = len-copy;
+    if (left > 0) {
+      // double the size of the write buffer
+      wBuf_ = (uint8_t *)realloc(wBuf_, sizeof(uint8_t) * wBufSize_ * 2);
+      memcpy(wBuf_ + wLen_, buf+copy, left);
+      wLen_ += left;
+      wBufSize_*=2;
+    }
+  } else {
+    memcpy(wBuf_+wLen_, buf, len);
+    wLen_ += len;
+  }
+}
+
+void TBufferedRouterTransport::flush()  {
+  // Write out any data waiting in the write buffer
+  if (wLen_-wPos_ > 0) {
+    trans_->write(wBuf_+wPos_, wLen_-wPos_);
+    wPos_ = wLen_;
+  }
+
+  // Flush the underlying transport
+  trans_->flush();
+}
+
+}}} // facebook::thrift::transport
diff --git a/lib/cpp/src/transport/TBufferedRouterTransport.h b/lib/cpp/src/transport/TBufferedRouterTransport.h
new file mode 100644
index 0000000..b01faac
--- /dev/null
+++ b/lib/cpp/src/transport/TBufferedRouterTransport.h
@@ -0,0 +1,92 @@
+#ifndef _THRIFT_TRANSPORT_TBUFFEREDROUTERTRANSPORT_H_
+#define _THRIFT_TRANSPORT_TBUFFEREDROUTERTRANSPORT_H_ 1
+
+#include "TTransport.h"
+#include "Thrift.h"
+#include <string>
+
+#include <boost/shared_ptr.hpp>
+
+namespace facebook { namespace thrift { namespace transport { 
+                         
+using namespace boost;
+
+/**
+ * BufferedRouterTransport. Funcationally equivalent to TBufferedTransport
+ * but routes the request to another Transport (typical use case is to route
+ * the request to TBufferedFileWriter to store the request on disk). The
+ * underlying buffer expands to a keep a copy of the entire request/response.
+ *
+ * @author Aditya Agarwal <aditya@facebook.com>
+ */
+class TBufferedRouterTransport : public TTransport {
+ public:
+  TBufferedRouterTransport(shared_ptr<TTransport> trans, shared_ptr<TTransport> rtrans) :
+    trans_(trans),
+    rtrans_(rtrans),
+    rBufSize_(512), rPos_(0), rLen_(0),
+    wBufSize_(512), wPos_(0), wLen_(0) {
+
+    rBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * rBufSize_);
+    wBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * wBufSize_);
+  }
+    
+  TBufferedRouterTransport(shared_ptr<TTransport> trans, shared_ptr<TTransport> rtrans, uint32_t sz) :
+    trans_(trans),
+    rtrans_(rtrans),
+    rBufSize_(512), rPos_(0), rLen_(0),
+    wBufSize_(sz), wPos_(0), wLen_(0) {
+
+    rBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * rBufSize_);
+    wBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * wBufSize_);
+  }
+
+  ~TBufferedRouterTransport() {
+    free(rBuf_);
+    free(wBuf_);
+  }
+
+  bool isOpen() {
+    return trans_->isOpen();
+  }
+  
+  void open() {
+    trans_->open();
+  }
+
+  void close() {
+    trans_->close();
+  }
+  
+  uint32_t read(uint8_t* buf, uint32_t len);
+
+  void readEnd() {
+    rtrans_->write(rBuf_, rLen_);
+
+    // reset state
+    rLen_ = 0;
+    rPos_ = 0;
+  }
+
+  void write(const uint8_t* buf, uint32_t len);
+
+  void flush();
+
+ protected:
+  shared_ptr<TTransport> trans_;
+  shared_ptr<TTransport> rtrans_;
+
+  uint8_t* rBuf_;
+  uint32_t rBufSize_;
+  uint32_t rPos_;
+  uint32_t rLen_;
+
+  uint8_t* wBuf_;
+  uint32_t wBufSize_;
+  uint32_t wPos_;
+  uint32_t wLen_;
+};
+
+}}} // facebook::thrift::transport
+
+#endif // #ifndef _THRIFT_TRANSPORT_TBUFFEREDROUTERTRANSPORT_H_
diff --git a/lib/cpp/src/transport/TBufferedRouterTransportFactory.h b/lib/cpp/src/transport/TBufferedRouterTransportFactory.h
new file mode 100644
index 0000000..2b82570
--- /dev/null
+++ b/lib/cpp/src/transport/TBufferedRouterTransportFactory.h
@@ -0,0 +1,35 @@
+#ifndef _THRIFT_TRANSPORT_TBUFFEREDROUTERTRANSPORTFACTORY_H_
+#define _THRIFT_TRANSPORT_TBUFFEREDROUTERTRANSPORTFACTORY_H_ 1
+
+#include <transport/TTransportFactory.h>
+#include <transport/TBufferedRouterTransport.h>
+#include <boost/shared_ptr.hpp>
+
+namespace facebook { namespace thrift { namespace transport { 
+
+/**
+ * Wraps a transport into a bufferedRouter instance.
+ *
+ * @author Aditya Agarwal <aditya@facebook.com>
+ */
+class TBufferedRouterTransportFactory : public TTransportFactory {
+ public:
+  TBufferedRouterTransportFactory(boost::shared_ptr<TTransport> rTrans): rTrans_(rTrans) {}
+
+  virtual ~TBufferedRouterTransportFactory() {}
+
+  /**
+   * Wraps the transport into a buffered one.
+   */
+  virtual std::pair<boost::shared_ptr<TTransport>, boost::shared_ptr<TTransport> > getIOTransports(boost::shared_ptr<TTransport> trans) {
+    boost::shared_ptr<TTransport> buffered(new TBufferedRouterTransport(trans, rTrans_));
+    return std::make_pair(buffered, buffered);
+  }
+
+ private:
+  boost::shared_ptr<TTransport> rTrans_;
+};
+
+}}}
+
+#endif // #ifndef _THRIFT_TRANSPORT_TBUFFEREDROUTERTRANSPORTFACTORY_H_