-- Thrift Log File
Summary:
-- This is the thrifty version of Pillar's batch_writer
-- Cleaned up a lot of the code in batch writer and made it conform to Thrift's strict coding standards
-- Added TBufferedRouterTransport.h/cc to actually route messsages via readEnd() to the file writer. It's
not quite as easy to route the messages in Thrift as it was in Pillar
Reviewed By: Slee
Test Plan: Tested by making sure that the file was recording data
Notes:
-- The real correctness test will be when I finish writing TLogFileTransport (pillar_logfile.cpp).
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664826 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am
index 2cb4759..a92de3e 100644
--- a/lib/cpp/Makefile.am
+++ b/lib/cpp/Makefile.am
@@ -11,6 +11,8 @@
src/concurrency/TimerManager.cc \
src/protocol/TBinaryProtocol.cc \
src/transport/TBufferedTransport.cc \
+ src/transport/TBufferedFileWriter.cc \
+ src/transport/TBufferedRouterTransport.cc \
src/transport/TFramedTransport.cc \
src/transport/TMemoryBuffer.cc \
src/transport/TSocket.cc \
@@ -30,7 +32,8 @@
include_thrift_HEADERS = \
config.h \
src/Thrift.h \
- src/TProcessor.h
+ src/TProcessor.h \
+ src/TLogging.h
include_concurrencydir = $(include_thriftdir)/concurrency
include_concurrency_HEADERS = \
@@ -59,7 +62,9 @@
src/transport/TTransport.h \
src/transport/TTransportException.h \
src/transport/TTransportFactory.h \
- src/transport/TBufferedTransportFactory.h
+ src/transport/TBufferedTransportFactory.h \
+ src/transport/TBufferedFileWriter.h \
+ src/transport/TBufferedRouterTransport.h
include_serverdir = $(include_thriftdir)/server
include_server_HEADERS = \
diff --git a/lib/cpp/src/transport/TBufferedFileWriter.cc b/lib/cpp/src/transport/TBufferedFileWriter.cc
new file mode 100644
index 0000000..ac46b97
--- /dev/null
+++ b/lib/cpp/src/transport/TBufferedFileWriter.cc
@@ -0,0 +1,348 @@
+#include "TBufferedFileWriter.h"
+
+#include <pthread.h>
+#include <cassert>
+#include <cstdlib>
+#include <string>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <fcntl.h>
+#include <errno.h>
+
+using std::string;
+
+namespace facebook { namespace thrift { namespace transport {
+
+TBufferedFileWriter::TBufferedFileWriter(string filename, uint32_t sz) {
+ init(filename, sz, 0, 0);
+}
+
+TBufferedFileWriter::TBufferedFileWriter(string filename, uint32_t sz, int fd, long long offset) {
+ init(filename, sz, fd, offset);
+}
+
+void TBufferedFileWriter::init(string filename, uint32_t sz, int fd, long long offset) {
+ // validate buffer size
+ sz_ = sz;
+ if (sz_ <= 0) {
+ throw TTransportException("invalid input buffer size");
+ }
+
+ // set file-related variables
+ fd_ = 0;
+ resetOutputFile(fd, filename, offset);
+
+ // set default values of flush related params
+ flushMaxBytes_ = 1024 * 100;
+ flushMaxUs_ = 20 * 1000;
+
+ // allocate event buffer
+ buffer_ = new eventInfo[sz_];
+
+ // buffer is initially empty
+ isEmpty_ = true;
+ isFull_ = false;
+
+ // both head and tail are initially at 0
+ headPos_ = 0;
+ tailPos_ = 0;
+
+ // for lack of a better option, set chunk size to 0. Users can change this to whatever they want
+ chunkSize_ = 0;
+
+ // initialize all the condition vars/mutexes
+ pthread_mutex_init(&mutex_, NULL);
+ pthread_cond_init(¬Full_, NULL);
+ pthread_cond_init(¬Empty_, NULL);
+ pthread_cond_init(&flushed_, NULL);
+
+ // not closing the file during init
+ closing_ = false;
+
+ // spawn writer thread
+ pthread_create(&writer_, NULL, startWriterThread, (void *)this);
+}
+
+void TBufferedFileWriter::resetOutputFile(int fd, string filename, long long offset) {
+ filename_ = filename;
+ offset_ = offset;
+
+ // check if current file is still open
+ if (fd_ > 0) {
+ // TODO: unclear if this should throw an error
+ fprintf(stderr, "error, current file not closed (trying to open %s)\n", filename_.c_str());
+ ::close(fd_);
+ }
+ fd_ = fd;
+}
+
+
+TBufferedFileWriter::~TBufferedFileWriter() {
+ // flush output buffer
+ flush();
+
+ // send a signal to write thread to end
+ closing_ = true;
+ pthread_join(writer_, NULL);
+
+ delete[] buffer_;
+
+ // TODO: should the file be closed here?
+}
+
+
+void TBufferedFileWriter::enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush) {
+ // make sure that event size is valid
+ if ( (maxEventSize_ > 0) && (eventLen > maxEventSize_) ) {
+ // ERROR("msg size is greater than max event size: %lu > %u\n", eventLen, maxEventSize_);
+ return;
+ }
+
+ if (eventLen == 0) {
+ ERROR("cannot enqueue an empty event");
+ return;
+ }
+
+ eventInfo toEnqueue;
+ uint8_t* bufCopy = (uint8_t *)malloc(sizeof(uint8_t) * eventLen);
+ toEnqueue.payLoad_ = bufCopy;
+ toEnqueue.eventSize_ = eventLen;
+
+ return enqueueEvent(toEnqueue, blockUntilFlush);
+}
+
+void TBufferedFileWriter::enqueueEvent(const eventInfo& toEnqueue, bool blockUntilFlush) {
+ // Lock mutex
+ pthread_mutex_lock(&mutex_);
+ // Can't enqueue while buffer is full
+ while(isFull_) {
+ pthread_cond_wait(¬Full_, &mutex_);
+ }
+
+ // make a copy and enqueue at tail of buffer
+ buffer_[tailPos_] = toEnqueue;
+ tailPos_ = (tailPos_+1) % sz_;
+
+ // mark the buffer as non-empty
+ isEmpty_ = false;
+
+ // circular buffer has wrapped around (and is full)
+ if(tailPos_ == headPos_) {
+ // DEBUG("queue is full");
+ isFull_ = true;
+ }
+
+ // signal anybody who's waiting for the buffer to be non-empty
+ pthread_cond_signal(¬Empty_);
+ if(blockUntilFlush) {
+ pthread_cond_wait(&flushed_, &mutex_);
+ }
+
+ // TODO: don't return until flushed to disk
+ // this really should be a loop where it makes sure it got flushed
+ // because condition variables can get triggered by the os for no reason
+ // it is probably a non-factor for the time being
+ pthread_mutex_unlock(&mutex_);
+
+}
+
+eventInfo TBufferedFileWriter::dequeueEvent(long long deadline) {
+ //deadline time struc
+ struct timespec ts;
+ if(deadline) {
+ ts.tv_sec = deadline/(1000*1000);
+ ts.tv_nsec = (deadline%(1000*1000))*1000;
+ }
+
+ // wait for the queue to fill up
+ pthread_mutex_lock(&mutex_);
+ while(isEmpty_) {
+ // do a timed wait on the condition variable
+ if(deadline) {
+ int e = pthread_cond_timedwait(¬Empty_, &mutex_, &ts);
+ if(e == ETIMEDOUT) {
+ break;
+ }
+ }
+ else {
+ // just wait until the buffer gets an item
+ pthread_cond_wait(¬Empty_, &mutex_);
+ }
+ }
+
+ string ret;
+ bool doSignal = false;
+
+ // could be empty if we timed out
+ eventInfo retEvent;
+ if(!isEmpty_) {
+ retEvent = buffer_[headPos_];
+ headPos_ = (headPos_+1) % sz_;
+
+ isFull_ = false;
+ doSignal = true;
+
+ // check if this is the last item in the buffer
+ if(headPos_ == tailPos_) {
+ isEmpty_ = true;
+ }
+ }
+
+ // unlock the mutex and signal if required
+ pthread_mutex_unlock(&mutex_);
+ if(doSignal) {
+ pthread_cond_signal(¬Full_);
+ }
+
+ return retEvent;
+}
+
+
+void TBufferedFileWriter::flush()
+{
+ eventInfo flushEvent;
+ flushEvent.payLoad_ = NULL;
+ flushEvent.eventSize_ = 0;
+
+ notFlushed_ = true;
+
+ enqueueEvent(flushEvent, false);
+
+ // wait for flush to take place
+ pthread_mutex_lock(&mutex_);
+
+ while(notFlushed_) {
+ pthread_cond_wait(&flushed_, &mutex_);
+ }
+
+ pthread_mutex_unlock(&mutex_);
+}
+
+void TBufferedFileWriter::openOutputFile() {
+ mode_t mode = S_IRUSR| S_IWUSR| S_IRGRP | S_IROTH;
+ fd_ = ::open(filename_.c_str(), O_WRONLY | O_CREAT | O_APPEND, mode);
+
+ // make sure open call was successful
+ if(fd_ == -1) {
+ char errorMsg[1024];
+ sprintf(errorMsg, "TBufferedFileWriter: Could not open file: %s", filename_.c_str());
+ perror(errorMsg);
+ throw TTransportException(errorMsg);
+ }
+}
+
+uint32_t TBufferedFileWriter::getCurrentTime() {
+ long long ret;
+ struct timeval tv;
+ gettimeofday(&tv, NULL);
+ ret = tv.tv_sec;
+ ret = ret*1000*1000 + tv.tv_usec;
+ return ret;
+}
+
+
+void TBufferedFileWriter::writerThread() {
+ // open file if it is not open
+ if(!fd_) {
+ openOutputFile();
+ }
+
+ // Figure out the next time by which a flush must take place
+ long long nextFlush = getCurrentTime() + flushMaxUs_;
+ uint32_t unflushed = 0;
+
+ while(1) {
+ // this will only be true when the destructor is being invoked
+ if(closing_) {
+ if(-1 == ::close(fd_)) {
+ perror("TBufferedFileWriter: error in close");
+ }
+ throw TTransportException("error in file close");
+ }
+
+ //long long start = now();
+ eventInfo outEvent = dequeueEvent(nextFlush);
+
+ // sanity check on event
+ if ( (maxEventSize_ > 0) && (outEvent.eventSize_ > maxEventSize_)) {
+ ERROR("msg size is greater than max event size: %u > %u\n", outEvent.eventSize_, maxEventSize_);
+ continue;
+ }
+ //long long diff = now()-start;
+ //DEBUG("got a dequeue of size %d after %lld ms\n", (int)s.size(), diff/1000);
+
+ // If chunking is required, then make sure that msg does not cross chunk boundary
+ if( (outEvent.eventSize_ > 0) && (chunkSize_ != 0)) {
+
+ // event size must be less than chunk size
+ if(outEvent.eventSize_ > chunkSize_) {
+ ERROR("TBufferedFileWriter: event size(%u) is greater than chunk size(%u): skipping event",
+ outEvent.eventSize_, chunkSize_);
+ continue;
+ }
+
+ long long chunk1 = offset_/chunkSize_;
+ long long chunk2 = (offset_ + outEvent.eventSize_ - 1)/chunkSize_;
+
+ // if adding this event will cross a chunk boundary, pad the chunk with zeros
+ if(chunk1 != chunk2) {
+ int padding = (int)(chunk2*chunkSize_ - offset_);
+
+ // sanity check
+ if (padding <= 0) {
+ DEBUG("Padding is empty, skipping event");
+ continue;
+ }
+ if (padding > (int32_t)chunkSize_) {
+ DEBUG("padding is larger than chunk size, skipping event");
+ continue;
+ }
+ // DEBUG("padding %d zeros to get to chunk %lld\n", padding, chunk2);
+ uint8_t zeros[padding];
+ bzero(zeros, padding);
+ if(-1 == ::write(fd_, zeros, padding)) {
+ perror("TBufferedFileWriter: error while padding zeros");
+ throw TTransportException("TBufferedFileWriter: error while padding zeros");
+ }
+ unflushed += padding;
+ offset_ += padding;
+ }
+ }
+
+ // write the dequeued event to the file
+ if(outEvent.eventSize_ > 0) {
+ if(-1 == ::write(fd_, outEvent.payLoad_, outEvent.eventSize_)) {
+ perror("TBufferedFileWriter: error while writing event");
+ // TODO: should this trigger an exception or simply continue?
+ throw TTransportException("TBufferedFileWriter: error while writing event");
+ }
+
+ // deallocate payload
+ free(outEvent.payLoad_);
+
+ unflushed += outEvent.eventSize_;
+ offset_ += outEvent.eventSize_;
+ }
+
+ // couple of cases from which a flush could be triggered
+ if((getCurrentTime() >= nextFlush && unflushed > 0) ||
+ unflushed > flushMaxBytes_ ||
+ (outEvent.eventSize_ == 0) ) {
+ //Debug("flushing %d bytes to %s (%d %d, full? %d)", unflushed, filename_.c_str(), headPos_, tailPos_, isFull_);
+
+ // sync (force flush) file to disk
+ fsync(fd_);
+ nextFlush = getCurrentTime() + flushMaxUs_;
+ unflushed = 0;
+
+ // notify anybody(thing?) waiting for flush completion
+ pthread_mutex_lock(&mutex_);
+ notFlushed_ = false;
+ pthread_mutex_unlock(&mutex_);
+ pthread_cond_broadcast(&flushed_);
+ }
+ }
+
+}
+
+}}} // facebook::thrift::transport
diff --git a/lib/cpp/src/transport/TBufferedFileWriter.h b/lib/cpp/src/transport/TBufferedFileWriter.h
new file mode 100644
index 0000000..c327aab
--- /dev/null
+++ b/lib/cpp/src/transport/TBufferedFileWriter.h
@@ -0,0 +1,138 @@
+#ifndef _THRIFT_TRANSPORT_TBUFFEREDFILEWRITER_H_
+#define _THRIFT_TRANSPORT_TBUFFEREDFILEWRITER_H_ 1
+
+#include "TTransport.h"
+#include "Thrift.h"
+
+#include <string>
+
+#include <boost/shared_ptr.hpp>
+
+namespace facebook { namespace thrift { namespace transport {
+
+using namespace boost;
+using std::string;
+
+// Data pertaining to a single event
+typedef struct eventInfo {
+ uint8_t* payLoad_;
+ uint32_t eventSize_;
+
+ eventInfo():payLoad_(NULL), eventSize_(0){};
+} eventInfo;
+
+
+/**
+ * Class that stores a circular in-memory event/message buffer and writes
+ * elements to disk when the buffer becomes full or a flush is triggered.
+ *
+ * @author Aditya Agarwal <aditya@facebook.com>
+ */
+class TBufferedFileWriter : public TTransport {
+ public:
+ void setFlushMaxUs(uint32_t flushMaxUs) {
+ flushMaxUs_ = flushMaxUs;
+ }
+ uint32_t getFlushMaxUs() {
+ return flushMaxUs_;
+ }
+
+ void setFlushMaxBytes(uint32_t flushMaxBytes) {
+ flushMaxBytes_ = flushMaxBytes;
+ }
+ uint32_t getFlushMaxBytes() {
+ return flushMaxBytes_;
+ }
+
+ void setChunkSize(uint32_t chunkSize) {
+ chunkSize_ = chunkSize;
+ }
+ uint32_t getChunkSize() {
+ return chunkSize_;
+ }
+
+ void setMaxEventSize(uint32_t maxEventSize) {
+ maxEventSize_ = maxEventSize;
+ }
+ uint32_t getMaxEventSize() {
+ return maxEventSize_;
+ }
+
+ TBufferedFileWriter(string filename, uint32_t sz);
+ TBufferedFileWriter(string filename, uint32_t sz, int fd, long long offset);
+ void init(string filename, uint32_t sz, int fd, long long offset);
+ ~TBufferedFileWriter();
+
+ void resetOutputFile(int fd, string filename, long long offset);
+
+ void enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush);
+ void enqueueEvent(const eventInfo& toEnqueue, bool blockUntilFlush);
+ void write(const uint8_t* buf, uint32_t len) {
+ enqueueEvent(buf, len, false);
+ }
+
+ eventInfo dequeueEvent(long long deadline);
+ void flush();
+
+ // control for writer thread
+ static void* startWriterThread(void* ptr) {
+ (((TBufferedFileWriter*)ptr)->writerThread());
+ return 0;
+ }
+ void writerThread();
+
+
+ private:
+ // circular buffer to hold data in before it is flushed. This is an array of strings. Each
+ // element of the array stores a msg that needs to be written to the file
+ eventInfo* buffer_;
+
+ // size of string buffer
+ uint32_t sz_;
+
+ // size of chunks that file will be split up into
+ uint32_t chunkSize_;
+
+ // max number of microseconds that can pass without flushing
+ uint32_t flushMaxUs_;
+
+ // max number of bytes that can be written without flushing
+ uint32_t flushMaxBytes_;
+
+ // max event size
+ uint32_t maxEventSize_;
+
+ // writer thread id
+ pthread_t writer_;
+
+ // variables that determine position of head/tail of circular buffer
+ int headPos_, tailPos_;
+
+ // variables indicating whether the buffer is full or empty
+ bool isFull_, isEmpty_;
+ pthread_cond_t notFull_, notEmpty_;
+ bool closing_;
+
+ // To keep track of whether the buffer has been flushed
+ pthread_cond_t flushed_;
+ bool notFlushed_;
+
+ // Mutex that is grabbed when enqueueing, dequeueing and flushing
+ // from the circular buffer
+ pthread_mutex_t mutex_;
+
+ // File information
+ string filename_;
+ int fd_;
+
+ // Offset within the file
+ long long offset_;
+
+ void openOutputFile();
+ uint32_t getCurrentTime();
+
+};
+
+}}}
+
+#endif // _THRIFT_TRANSPORT_TBUFFEREDFILEWRITER_H_
diff --git a/lib/cpp/src/transport/TBufferedRouterTransport.cc b/lib/cpp/src/transport/TBufferedRouterTransport.cc
new file mode 100644
index 0000000..7c09953
--- /dev/null
+++ b/lib/cpp/src/transport/TBufferedRouterTransport.cc
@@ -0,0 +1,79 @@
+#include "TBufferedRouterTransport.h"
+#include "Thrift.h"
+using std::string;
+
+namespace facebook { namespace thrift { namespace transport {
+
+uint32_t TBufferedRouterTransport::read(uint8_t* buf, uint32_t len) {
+ uint32_t need = len;
+
+ // We don't have enough data yet
+ if (rLen_-rPos_ < need) {
+ // Copy out whatever we have
+ if (rLen_-rPos_ > 0) {
+ memcpy(buf, rBuf_+rPos_, rLen_-rPos_);
+ need -= rLen_-rPos_;
+ buf += rLen_-rPos_;
+ rPos_ = rLen_;
+ }
+
+ // Double the size of the underlying buffer if it is full
+ if (rLen_ == rBufSize_) {
+ rBufSize_ *=2;
+ rBuf_ = (uint8_t *)realloc(rBuf_, sizeof(uint8_t) * rBufSize_);
+ }
+
+ // try to fill up the buffer
+ rLen_ += trans_->read(rBuf_+rPos_, rBufSize_ - rPos_);
+ }
+
+ // Hand over whatever we have
+ uint32_t give = need;
+ if (rLen_-rPos_ < give) {
+ give = rLen_-rPos_;
+ }
+ memcpy(buf, rBuf_+rPos_, give);
+ rPos_ += give;
+ need -= give;
+
+ return (len - need);
+}
+
+void TBufferedRouterTransport::write(const uint8_t* buf, uint32_t len) {
+ if (len == 0) {
+ return;
+ }
+
+ if (len + wLen_ >= wBufSize_) {
+ uint32_t copy = wBufSize_ - wLen_;
+ memcpy(wBuf_ + wLen_, buf, copy);
+ trans_->write(wBuf_+wPos_, wBufSize_-wPos_);
+ wLen_ += copy;
+ wPos_ = wLen_;
+
+ uint32_t left = len-copy;
+ if (left > 0) {
+ // double the size of the write buffer
+ wBuf_ = (uint8_t *)realloc(wBuf_, sizeof(uint8_t) * wBufSize_ * 2);
+ memcpy(wBuf_ + wLen_, buf+copy, left);
+ wLen_ += left;
+ wBufSize_*=2;
+ }
+ } else {
+ memcpy(wBuf_+wLen_, buf, len);
+ wLen_ += len;
+ }
+}
+
+void TBufferedRouterTransport::flush() {
+ // Write out any data waiting in the write buffer
+ if (wLen_-wPos_ > 0) {
+ trans_->write(wBuf_+wPos_, wLen_-wPos_);
+ wPos_ = wLen_;
+ }
+
+ // Flush the underlying transport
+ trans_->flush();
+}
+
+}}} // facebook::thrift::transport
diff --git a/lib/cpp/src/transport/TBufferedRouterTransport.h b/lib/cpp/src/transport/TBufferedRouterTransport.h
new file mode 100644
index 0000000..b01faac
--- /dev/null
+++ b/lib/cpp/src/transport/TBufferedRouterTransport.h
@@ -0,0 +1,92 @@
+#ifndef _THRIFT_TRANSPORT_TBUFFEREDROUTERTRANSPORT_H_
+#define _THRIFT_TRANSPORT_TBUFFEREDROUTERTRANSPORT_H_ 1
+
+#include "TTransport.h"
+#include "Thrift.h"
+#include <string>
+
+#include <boost/shared_ptr.hpp>
+
+namespace facebook { namespace thrift { namespace transport {
+
+using namespace boost;
+
+/**
+ * BufferedRouterTransport. Funcationally equivalent to TBufferedTransport
+ * but routes the request to another Transport (typical use case is to route
+ * the request to TBufferedFileWriter to store the request on disk). The
+ * underlying buffer expands to a keep a copy of the entire request/response.
+ *
+ * @author Aditya Agarwal <aditya@facebook.com>
+ */
+class TBufferedRouterTransport : public TTransport {
+ public:
+ TBufferedRouterTransport(shared_ptr<TTransport> trans, shared_ptr<TTransport> rtrans) :
+ trans_(trans),
+ rtrans_(rtrans),
+ rBufSize_(512), rPos_(0), rLen_(0),
+ wBufSize_(512), wPos_(0), wLen_(0) {
+
+ rBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * rBufSize_);
+ wBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * wBufSize_);
+ }
+
+ TBufferedRouterTransport(shared_ptr<TTransport> trans, shared_ptr<TTransport> rtrans, uint32_t sz) :
+ trans_(trans),
+ rtrans_(rtrans),
+ rBufSize_(512), rPos_(0), rLen_(0),
+ wBufSize_(sz), wPos_(0), wLen_(0) {
+
+ rBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * rBufSize_);
+ wBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * wBufSize_);
+ }
+
+ ~TBufferedRouterTransport() {
+ free(rBuf_);
+ free(wBuf_);
+ }
+
+ bool isOpen() {
+ return trans_->isOpen();
+ }
+
+ void open() {
+ trans_->open();
+ }
+
+ void close() {
+ trans_->close();
+ }
+
+ uint32_t read(uint8_t* buf, uint32_t len);
+
+ void readEnd() {
+ rtrans_->write(rBuf_, rLen_);
+
+ // reset state
+ rLen_ = 0;
+ rPos_ = 0;
+ }
+
+ void write(const uint8_t* buf, uint32_t len);
+
+ void flush();
+
+ protected:
+ shared_ptr<TTransport> trans_;
+ shared_ptr<TTransport> rtrans_;
+
+ uint8_t* rBuf_;
+ uint32_t rBufSize_;
+ uint32_t rPos_;
+ uint32_t rLen_;
+
+ uint8_t* wBuf_;
+ uint32_t wBufSize_;
+ uint32_t wPos_;
+ uint32_t wLen_;
+};
+
+}}} // facebook::thrift::transport
+
+#endif // #ifndef _THRIFT_TRANSPORT_TBUFFEREDROUTERTRANSPORT_H_
diff --git a/lib/cpp/src/transport/TBufferedRouterTransportFactory.h b/lib/cpp/src/transport/TBufferedRouterTransportFactory.h
new file mode 100644
index 0000000..2b82570
--- /dev/null
+++ b/lib/cpp/src/transport/TBufferedRouterTransportFactory.h
@@ -0,0 +1,35 @@
+#ifndef _THRIFT_TRANSPORT_TBUFFEREDROUTERTRANSPORTFACTORY_H_
+#define _THRIFT_TRANSPORT_TBUFFEREDROUTERTRANSPORTFACTORY_H_ 1
+
+#include <transport/TTransportFactory.h>
+#include <transport/TBufferedRouterTransport.h>
+#include <boost/shared_ptr.hpp>
+
+namespace facebook { namespace thrift { namespace transport {
+
+/**
+ * Wraps a transport into a bufferedRouter instance.
+ *
+ * @author Aditya Agarwal <aditya@facebook.com>
+ */
+class TBufferedRouterTransportFactory : public TTransportFactory {
+ public:
+ TBufferedRouterTransportFactory(boost::shared_ptr<TTransport> rTrans): rTrans_(rTrans) {}
+
+ virtual ~TBufferedRouterTransportFactory() {}
+
+ /**
+ * Wraps the transport into a buffered one.
+ */
+ virtual std::pair<boost::shared_ptr<TTransport>, boost::shared_ptr<TTransport> > getIOTransports(boost::shared_ptr<TTransport> trans) {
+ boost::shared_ptr<TTransport> buffered(new TBufferedRouterTransport(trans, rTrans_));
+ return std::make_pair(buffered, buffered);
+ }
+
+ private:
+ boost::shared_ptr<TTransport> rTrans_;
+};
+
+}}}
+
+#endif // #ifndef _THRIFT_TRANSPORT_TBUFFEREDROUTERTRANSPORTFACTORY_H_