Thrift: Adding StatsProcessor, PeekProcessor, TPipedFileReaderTransport, and TPipedFileReaderTransportFactory classes
- StatsProcessor can be used to print events, or keep track of event frequency
- PeekProcessor is used to examine data in a thrift event, prior to passing it along to an underlying processor
- TPipedFileReaderTransport and its factory are used to pipe a TFileReaderTransport (which TFileProcessor requires)

Also fixed some bugs in TFileTransport - next flush time was overflowing and not always being reset

Reviewed by: aditya, mcslee

Test Plan: Tested using various thrift clients (scribe, falcon) and gdb in sandbox and on dev008.


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665066 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am
index b205e78..84618f4 100644
--- a/lib/cpp/Makefile.am
+++ b/lib/cpp/Makefile.am
@@ -11,7 +11,8 @@
                     src/concurrency/PosixThreadFactory.cpp \
                     src/concurrency/ThreadManager.cpp \
                     src/concurrency/TimerManager.cpp \
-	            src/protocol/TBinaryProtocol.cpp \
+                    src/processor/PeekProcessor.cpp \
+                    src/protocol/TBinaryProtocol.cpp \
                     src/transport/TFileTransport.cpp \
                     src/transport/THttpClient.cpp \
                     src/transport/TSocket.cpp \
@@ -34,14 +35,14 @@
 
 include_thriftdir = $(includedir)/thrift
 include_thrift_HEADERS = \
-			 config.h \
-			 src/Thrift.h \
-			 src/TProcessor.h \
-			 src/TLogging.h
+                         config.h \
+                         src/Thrift.h \
+                         src/TProcessor.h \
+                         src/TLogging.h
 
 include_concurrencydir = $(include_thriftdir)/concurrency
 include_concurrency_HEADERS = \
-			 src/concurrency/Exception.h \
+                         src/concurrency/Exception.h \
                          src/concurrency/Mutex.h \
                          src/concurrency/Monitor.h \
                          src/concurrency/PosixThreadFactory.h \
diff --git a/lib/cpp/src/TProcessor.h b/lib/cpp/src/TProcessor.h
index affbe81..ac77da3 100644
--- a/lib/cpp/src/TProcessor.h
+++ b/lib/cpp/src/TProcessor.h
@@ -28,7 +28,7 @@
   virtual bool process(boost::shared_ptr<protocol::TProtocol> in,
                        boost::shared_ptr<protocol::TProtocol> out) = 0;
 
-  bool process(boost::shared_ptr<facebook::thrift::protocol::TProtocol> io) {
+  virtual bool process(boost::shared_ptr<facebook::thrift::protocol::TProtocol> io) {
     return process(io, io);
   }
 
diff --git a/lib/cpp/src/processor/PeekProcessor.cpp b/lib/cpp/src/processor/PeekProcessor.cpp
new file mode 100644
index 0000000..a7c5571
--- /dev/null
+++ b/lib/cpp/src/processor/PeekProcessor.cpp
@@ -0,0 +1,84 @@
+#include "PeekProcessor.h"
+
+namespace facebook { namespace thrift { namespace processor { 
+
+PeekProcessor::PeekProcessor() {
+  memoryBuffer_.reset(new facebook::thrift::transport::TMemoryBuffer());
+}
+PeekProcessor::~PeekProcessor() {}
+
+void PeekProcessor::initialize(boost::shared_ptr<facebook::thrift::TProcessor> actualProcessor,
+                    boost::shared_ptr<facebook::thrift::protocol::TProtocolFactory> protocolFactory,
+                    boost::shared_ptr<facebook::thrift::transport::TPipedTransportFactory> transportFactory) {
+  actualProcessor_ = actualProcessor;
+  pipedProtocol_ = protocolFactory->getProtocol(memoryBuffer_);
+  transportFactory_ = transportFactory;
+  transportFactory_->initializeTargetTransport(memoryBuffer_);
+}
+
+boost::shared_ptr<facebook::thrift::transport::TTransport> PeekProcessor::getPipedTransport(boost::shared_ptr<facebook::thrift::transport::TTransport> in) {
+  return transportFactory_->getTransport(in);
+}
+
+bool PeekProcessor::process(boost::shared_ptr<facebook::thrift::protocol::TProtocol> in, 
+                            boost::shared_ptr<facebook::thrift::protocol::TProtocol> out) {
+
+  std::string fname;
+  facebook::thrift::protocol::TMessageType mtype;
+  int32_t seqid;
+  in->readMessageBegin(fname, mtype, seqid);
+
+  if (mtype != facebook::thrift::protocol::T_CALL) {
+    throw facebook::thrift::TException("Unexpected message type");
+  }
+
+  // Peek at the name
+  peekName(fname);
+
+  facebook::thrift::protocol::TType ftype;
+  int16_t fid;
+  while (true) {
+    in->readFieldBegin(fname, ftype, fid);
+    if (ftype == facebook::thrift::protocol::T_STOP) {
+      break;
+    }
+
+    // Peek at the variable
+    peek(in, ftype, fid);
+  }
+  in->readMessageEnd();
+  in->getTransport()->readEnd();
+
+  // Done peeking at variables
+  peekEnd();
+
+  //
+  // All the data is now in memoryBuffer_ and ready to be processed
+  //
+
+  // Let's first take a peek at the full data in memory
+  uint8_t* buffer;
+  uint32_t size;
+  memoryBuffer_->getBuffer(&buffer, &size);
+  peekBuffer(buffer, size);
+  
+  bool ret = actualProcessor_->process(pipedProtocol_, out);
+  memoryBuffer_->resetBuffer();
+  return ret;
+}
+
+void PeekProcessor::peekName(const std::string& fname) {
+}
+
+void PeekProcessor::peekBuffer(uint8_t* buffer, uint32_t size) {
+}
+
+void PeekProcessor::peek(boost::shared_ptr<facebook::thrift::protocol::TProtocol> in, 
+                  facebook::thrift::protocol::TType ftype,
+                  int16_t fid) {
+  in->skip(ftype);
+}
+
+void PeekProcessor::peekEnd() {}
+
+}}}
diff --git a/lib/cpp/src/processor/PeekProcessor.h b/lib/cpp/src/processor/PeekProcessor.h
new file mode 100644
index 0000000..d1d227c
--- /dev/null
+++ b/lib/cpp/src/processor/PeekProcessor.h
@@ -0,0 +1,61 @@
+// Copyright (c) 2006- Facebook
+// Distributed under the Thrift Software License
+//
+// See accompanying file LICENSE or visit the Thrift site at:
+// http://developers.facebook.com/thrift/
+
+#ifndef PEEKPROCESSOR_H
+#define PEEKPROCESSOR_H
+
+#include <string>
+#include <TProcessor.h>
+#include <transport/TTransport.h>
+#include <transport/TTransportUtils.h>
+#include <boost/shared_ptr.hpp>
+
+namespace facebook { namespace thrift { namespace processor { 
+
+/*
+ * Class for peeking at the raw data that is being processed by another processor
+ * and gives the derived class a chance to change behavior accordingly
+ *
+ * @author James Wang <jwang@facebook.com>
+ */
+class PeekProcessor : public facebook::thrift::TProcessor {
+  
+ public:
+  PeekProcessor();
+  virtual ~PeekProcessor();
+
+  // Input here: actualProcessor  - the underlying processor
+  //             protocolFactory  - the protocol factory used to wrap the memory buffer
+  //             transportFactory - this TPipedTransportFactory is used to wrap the source transport
+  //                                via a call to getPipedTransport
+  void initialize(boost::shared_ptr<facebook::thrift::TProcessor> actualProcessor,
+                  boost::shared_ptr<facebook::thrift::protocol::TProtocolFactory> protocolFactory,
+                  boost::shared_ptr<facebook::thrift::transport::TPipedTransportFactory> transportFactory);
+
+  boost::shared_ptr<facebook::thrift::transport::TTransport> getPipedTransport(boost::shared_ptr<facebook::thrift::transport::TTransport> in);
+
+  virtual bool process(boost::shared_ptr<facebook::thrift::protocol::TProtocol> in, 
+                       boost::shared_ptr<facebook::thrift::protocol::TProtocol> out);
+
+  // The following three functions can be overloaded by child classes to
+  // achieve desired peeking behavior
+  virtual void peekName(const std::string& fname);
+  virtual void peekBuffer(uint8_t* buffer, uint32_t size);
+  virtual void peek(boost::shared_ptr<facebook::thrift::protocol::TProtocol> in, 
+                    facebook::thrift::protocol::TType ftype,
+                    int16_t fid);
+  virtual void peekEnd();
+
+ private:
+  boost::shared_ptr<facebook::thrift::TProcessor> actualProcessor_;
+  boost::shared_ptr<facebook::thrift::protocol::TProtocol> pipedProtocol_;
+  boost::shared_ptr<facebook::thrift::transport::TPipedTransportFactory> transportFactory_;
+  boost::shared_ptr<facebook::thrift::transport::TMemoryBuffer> memoryBuffer_;
+};
+
+}}} // facebook::thrift::processor
+
+#endif
diff --git a/lib/cpp/src/processor/StatsProcessor.h b/lib/cpp/src/processor/StatsProcessor.h
new file mode 100644
index 0000000..e080432
--- /dev/null
+++ b/lib/cpp/src/processor/StatsProcessor.h
@@ -0,0 +1,252 @@
+// Copyright (c) 2006- Facebook
+// Distributed under the Thrift Software License
+//
+// See accompanying file LICENSE or visit the Thrift site at:
+// http://developers.facebook.com/thrift/
+
+#ifndef STATSPROCESSOR_H
+#define STATSPROCESSOR_H
+
+#include <boost/shared_ptr.hpp>
+#include <transport/TTransport.h>
+#include <protocol/TProtocol.h>
+#include <TProcessor.h>
+
+namespace facebook { namespace thrift { namespace processor { 
+
+/*
+ * Class for keeping track of function call statistics and printing them if desired
+ *
+ * @author James Wang <jwang@facebook.com>
+ */
+class StatsProcessor : public facebook::thrift::TProcessor {
+public:
+  StatsProcessor(bool print, bool frequency) 
+    : print_(print),
+      frequency_(frequency)
+  {}
+  virtual ~StatsProcessor() {};
+
+  virtual bool process(boost::shared_ptr<facebook::thrift::protocol::TProtocol> piprot, boost::shared_ptr<facebook::thrift::protocol::TProtocol> poprot) {
+
+    piprot_ = piprot;
+
+    std::string fname;
+    facebook::thrift::protocol::TMessageType mtype;
+    int32_t seqid;
+
+    piprot_->readMessageBegin(fname, mtype, seqid);
+    if (mtype != facebook::thrift::protocol::T_CALL) {
+      if (print_) {
+        printf("Unknown message type\n");
+      }
+      throw facebook::thrift::TException("Unexpected message type");
+    }
+    if (print_) {
+      printf("%s (", fname.c_str());
+    }
+    if (frequency_) {
+      if (frequency_map_.find(fname) != frequency_map_.end()) {
+        frequency_map_[fname]++;
+      } else {
+        frequency_map_[fname] = 1;
+      }
+    }
+
+    facebook::thrift::protocol::TType ftype;
+    int16_t fid;
+
+    while (true) {
+      piprot_->readFieldBegin(fname, ftype, fid);
+      if (ftype == facebook::thrift::protocol::T_STOP) {
+        break;
+      }
+
+      printAndPassToBuffer(ftype);
+      if (print_) {
+        printf(", ");
+      }
+    }
+
+    if (print_) {
+      printf("\b\b)\n");
+    }
+    return true;
+  }
+
+  const std::map<std::string, long long>& get_frequency_map() {
+    return frequency_map_;
+  }
+
+protected:
+  void printAndPassToBuffer(facebook::thrift::protocol::TType ftype) {
+    switch (ftype) {
+      case facebook::thrift::protocol::T_BOOL:
+        {
+          bool boolv;
+          piprot_->readBool(boolv);
+          if (print_) {
+            printf("%d", boolv);
+          }
+        }
+        break;
+      case facebook::thrift::protocol::T_BYTE:
+        {
+          int8_t bytev;
+          piprot_->readByte(bytev);
+          if (print_) {
+            printf("%d", bytev);
+          }
+        }
+        break;
+      case facebook::thrift::protocol::T_I16:
+        {
+          int16_t i16;
+          piprot_->readI16(i16);
+          if (print_) {
+            printf("%d", i16);
+          }
+        }
+        break;
+      case facebook::thrift::protocol::T_I32:
+        {
+          int32_t i32;
+          piprot_->readI32(i32);
+          if (print_) {
+            printf("%d", i32);
+          }
+        }
+        break;
+      case facebook::thrift::protocol::T_I64:
+        {
+          int64_t i64;
+          piprot_->readI64(i64);
+          if (print_) {
+            printf("%ld", i64);
+          }
+        }
+        break;
+      case facebook::thrift::protocol::T_DOUBLE:
+        {
+          double dub; 
+          piprot_->readDouble(dub);
+          if (print_) {
+            printf("%f", dub);
+          }
+        }
+        break;
+      case facebook::thrift::protocol::T_STRING: 
+        {
+          std::string str;
+          piprot_->readString(str);
+          if (print_) {
+            printf("%s", str.c_str());
+          }
+        }
+        break;
+      case facebook::thrift::protocol::T_STRUCT: 
+        {
+          std::string name;
+          int16_t fid;
+          facebook::thrift::protocol::TType ftype;
+          piprot_->readStructBegin(name);
+          if (print_) {
+            printf("<");
+          }
+          while (true) {
+            piprot_->readFieldBegin(name, ftype, fid);
+            if (ftype == facebook::thrift::protocol::T_STOP) {
+              break;
+            }
+            printAndPassToBuffer(ftype);
+            if (print_) {
+              printf(",");
+            }
+            piprot_->readFieldEnd();
+          }
+          piprot_->readStructEnd();
+          if (print_) {
+            printf("\b>");
+          }
+        }
+        break;
+      case facebook::thrift::protocol::T_MAP:
+        {
+          facebook::thrift::protocol::TType keyType;
+          facebook::thrift::protocol::TType valType;
+          uint32_t i, size;
+          piprot_->readMapBegin(keyType, valType, size);
+          if (print_) {
+            printf("{");
+          }
+          for (i = 0; i < size; i++) {
+            printAndPassToBuffer(keyType);
+            if (print_) {
+              printf("=>");
+            }
+            printAndPassToBuffer(valType);
+            if (print_) {
+              printf(",");
+            }
+          }
+          piprot_->readMapEnd();
+          if (print_) {
+            printf("\b}");
+          }
+        }
+        break;
+      case facebook::thrift::protocol::T_SET:
+        {
+          facebook::thrift::protocol::TType elemType;
+          uint32_t i, size;
+          piprot_->readSetBegin(elemType, size);
+          if (print_) {
+            printf("{");
+          }
+          for (i = 0; i < size; i++) {
+            printAndPassToBuffer(elemType);
+            if (print_) {
+              printf(",");
+            }
+          }
+          piprot_->readSetEnd();
+          if (print_) {
+            printf("\b}");
+          }
+        }
+        break;
+      case facebook::thrift::protocol::T_LIST:
+        {
+          facebook::thrift::protocol::TType elemType;
+          uint32_t i, size;
+          piprot_->readListBegin(elemType, size);
+          if (print_) {
+            printf("[");
+          }
+          for (i = 0; i < size; i++) {
+            printAndPassToBuffer(elemType);
+            if (print_) {
+              printf(",");
+            }
+          }
+          piprot_->readListEnd();
+          if (print_) {
+            printf("\b]");
+          }
+        }
+        break;
+      default:
+        break;
+    }
+  }
+
+  boost::shared_ptr<facebook::thrift::protocol::TProtocol> piprot_;
+  std::map<std::string, long long> frequency_map_;
+
+  bool print_;
+  bool frequency_;
+};
+
+}}} // facebook::thrift::processor
+
+#endif
diff --git a/lib/cpp/src/transport/TFileTransport.cpp b/lib/cpp/src/transport/TFileTransport.cpp
index 9a3ca30..001783f 100644
--- a/lib/cpp/src/transport/TFileTransport.cpp
+++ b/lib/cpp/src/transport/TFileTransport.cpp
@@ -204,27 +204,14 @@
   pthread_mutex_unlock(&mutex_);
 }
 
-bool TFileTransport::swapEventBuffers(long long deadline) {
-  //deadline time struc
-  struct timespec ts;
-  if (deadline) {
-    ts.tv_sec = deadline/(1000*1000);
-    ts.tv_nsec = (deadline%(1000*1000))*1000;
-  }
-
-  // wait for the queue to fill up
+bool TFileTransport::swapEventBuffers(struct timespec* deadline) {
   pthread_mutex_lock(&mutex_);
-  while (enqueueBuffer_->isEmpty()) {
-    // do a timed wait on the condition variable
-    if (deadline) {
-      int e = pthread_cond_timedwait(&notEmpty_, &mutex_, &ts);
-      if(e == ETIMEDOUT) {
-        break;
-      }
-    } else {
-      // just wait until the buffer gets an item
-      pthread_cond_wait(&notEmpty_, &mutex_);
-    }
+  if (deadline != NULL) {
+    // if we were handed a deadline time struct, do a timed wait
+    pthread_cond_timedwait(&notEmpty_, &mutex_, deadline);
+  } else {
+    // just wait until the buffer gets an item
+    pthread_cond_wait(&notEmpty_, &mutex_);
   }
 
   bool swapped = false;
@@ -259,7 +246,9 @@
   offset_ = lseek(fd_, 0, SEEK_END);
 
   // Figure out the next time by which a flush must take place
-  long long nextFlush = getCurrentTime() + flushMaxUs_;
+
+  struct timespec ts_next_flush;
+  getNextFlushTime(&ts_next_flush);
   uint32_t unflushed = 0;
 
   while(1) {
@@ -273,7 +262,7 @@
       return;
     }
 
-    if (swapEventBuffers(nextFlush)) {
+    if (swapEventBuffers(&ts_next_flush)) {
       eventInfo* outEvent;
       while (NULL != (outEvent = dequeueBuffer_->getNext())) {
         if (!outEvent) {
@@ -342,14 +331,23 @@
       dequeueBuffer_->reset();
     }
 
+    bool flushTimeElapsed = false;
+    struct timespec current_time;
+    clock_gettime(CLOCK_REALTIME, &current_time);
+
+    if (current_time.tv_sec > ts_next_flush.tv_sec ||
+        (current_time.tv_sec == ts_next_flush.tv_sec && current_time.tv_nsec > ts_next_flush.tv_nsec)) {
+      flushTimeElapsed = true;
+      getNextFlushTime(&ts_next_flush);
+    }
+
     // couple of cases from which a flush could be triggered
-    if ((getCurrentTime() >= nextFlush && unflushed > 0) ||
+    if ((flushTimeElapsed && unflushed > 0) ||
        unflushed > flushMaxBytes_ ||
        forceFlush_) {
 
       // sync (force flush) file to disk
       fsync(fd_);
-      nextFlush = getCurrentTime() + flushMaxUs_;
       unflushed = 0;
 
       // notify anybody waiting for flush completion
@@ -697,13 +695,14 @@
   offset_ = lseek(fd_, 0, SEEK_CUR);
 }
 
-uint32_t TFileTransport::getCurrentTime() {
-  long long ret;
-  struct timeval tv;
-  gettimeofday(&tv, NULL);
-  ret = tv.tv_sec;
-  ret = ret*1000*1000 + tv.tv_usec;
-  return ret;
+void TFileTransport::getNextFlushTime(struct timespec* ts_next_flush) {
+  clock_gettime(CLOCK_REALTIME, ts_next_flush);
+  ts_next_flush->tv_nsec += (flushMaxUs_ % 1000000) * 1000;
+  if (ts_next_flush->tv_nsec > 1000000000) {
+    ts_next_flush->tv_nsec -= 1000000000;
+    ts_next_flush->tv_sec += 1;
+  }
+  ts_next_flush->tv_sec += flushMaxUs_ / 1000000;
 }
 
 TFileTransportBuffer::TFileTransportBuffer(uint32_t size)
@@ -773,7 +772,7 @@
 
 TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
                                shared_ptr<TProtocolFactory> protocolFactory,
-                               shared_ptr<TFileTransport> inputTransport):
+                               shared_ptr<TFileReaderTransport> inputTransport):
   processor_(processor), 
   inputProtocolFactory_(protocolFactory), 
   outputProtocolFactory_(protocolFactory), 
@@ -786,7 +785,7 @@
 TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
                                shared_ptr<TProtocolFactory> inputProtocolFactory,
                                shared_ptr<TProtocolFactory> outputProtocolFactory,
-                               shared_ptr<TFileTransport> inputTransport):
+                               shared_ptr<TFileReaderTransport> inputTransport):
   processor_(processor), 
   inputProtocolFactory_(inputProtocolFactory), 
   outputProtocolFactory_(outputProtocolFactory), 
@@ -798,7 +797,7 @@
 
 TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
                                shared_ptr<TProtocolFactory> protocolFactory,
-                               shared_ptr<TFileTransport> inputTransport,
+                               shared_ptr<TFileReaderTransport> inputTransport,
                                shared_ptr<TTransport> outputTransport):
   processor_(processor), 
   inputProtocolFactory_(protocolFactory), 
diff --git a/lib/cpp/src/transport/TFileTransport.h b/lib/cpp/src/transport/TFileTransport.h
index f4d25f4..e31e85c 100644
--- a/lib/cpp/src/transport/TFileTransport.h
+++ b/lib/cpp/src/transport/TFileTransport.h
@@ -122,12 +122,36 @@
 };
 
 /**
+ * Abstract interface for transports used to read files
+ */
+class TFileReaderTransport : virtual public TTransport {
+ public:
+  virtual int32_t getReadTimeout() = 0;
+  virtual void setReadTimeout(int32_t readTimeout) = 0;
+
+  virtual uint32_t getNumChunks() = 0;
+  virtual uint32_t getCurChunk() = 0;
+  virtual void seekToChunk(int32_t chunk) = 0;
+  virtual void seekToEnd() = 0;
+};
+
+/**
+ * Abstract interface for transports used to write files
+ */
+class TFileWriterTransport : virtual public TTransport {
+ public:
+  virtual uint32_t getChunkSize() = 0;
+  virtual void setChunkSize(uint32_t chunkSize) = 0;
+};
+
+/**
  * File implementation of a transport. Reads and writes are done to a 
  * file on disk.
  *
  * @author Aditya Agarwal <aditya@facebook.com>
  */
-class TFileTransport : public TTransport {
+class TFileTransport : public TFileReaderTransport,
+                       public TFileWriterTransport {
  public:
   TFileTransport(std::string path);
   ~TFileTransport();
@@ -240,7 +264,7 @@
  private:
   // helper functions for writing to a file
   void enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush);
-  bool swapEventBuffers(long long deadline);
+  bool swapEventBuffers(struct timespec* deadline);
   bool initBufferAndWriteThread();
 
   // control for writer thread
@@ -259,7 +283,7 @@
 
   // Utility functions
   void openLogFile();
-  uint32_t getCurrentTime();
+  void getNextFlushTime(struct timespec* ts_next_flush);
 
   // Class variables
   readState readState_;
@@ -358,12 +382,12 @@
    */
   TFileProcessor(boost::shared_ptr<TProcessor> processor,
                  boost::shared_ptr<TProtocolFactory> protocolFactory,
-                 boost::shared_ptr<TFileTransport> inputTransport);
+                 boost::shared_ptr<TFileReaderTransport> inputTransport);
 
   TFileProcessor(boost::shared_ptr<TProcessor> processor,
                  boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
                  boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
-                 boost::shared_ptr<TFileTransport> inputTransport);
+                 boost::shared_ptr<TFileReaderTransport> inputTransport);
 
   /** 
    * Constructor
@@ -375,7 +399,7 @@
    */    
   TFileProcessor(boost::shared_ptr<TProcessor> processor,
                  boost::shared_ptr<TProtocolFactory> protocolFactory,
-                 boost::shared_ptr<TFileTransport> inputTransport,
+                 boost::shared_ptr<TFileReaderTransport> inputTransport,
                  boost::shared_ptr<TTransport> outputTransport);
 
   /**
@@ -396,7 +420,7 @@
   boost::shared_ptr<TProcessor> processor_;
   boost::shared_ptr<TProtocolFactory> inputProtocolFactory_;
   boost::shared_ptr<TProtocolFactory> outputProtocolFactory_;
-  boost::shared_ptr<TFileTransport> inputTransport_;
+  boost::shared_ptr<TFileReaderTransport> inputTransport_;
   boost::shared_ptr<TTransport> outputTransport_;
 };
 
diff --git a/lib/cpp/src/transport/TTransportUtils.cpp b/lib/cpp/src/transport/TTransportUtils.cpp
index aaefb2c..290b483 100644
--- a/lib/cpp/src/transport/TTransportUtils.cpp
+++ b/lib/cpp/src/transport/TTransportUtils.cpp
@@ -296,4 +296,87 @@
   srcTrans_->flush();
 }
 
+TPipedFileReaderTransport::TPipedFileReaderTransport(boost::shared_ptr<TFileReaderTransport> srcTrans, boost::shared_ptr<TTransport> dstTrans) 
+  : TPipedTransport(srcTrans, dstTrans),
+    srcTrans_(srcTrans) {
+}
+
+TPipedFileReaderTransport::~TPipedFileReaderTransport() {
+}
+
+bool TPipedFileReaderTransport::isOpen() {
+  return TPipedTransport::isOpen();
+}
+
+bool TPipedFileReaderTransport::peek() {
+  return TPipedTransport::peek();
+}
+
+void TPipedFileReaderTransport::open() {
+  TPipedTransport::open();
+}
+
+void TPipedFileReaderTransport::close() {
+  TPipedTransport::close();
+}
+
+uint32_t TPipedFileReaderTransport::read(uint8_t* buf, uint32_t len) {
+  return TPipedTransport::read(buf, len);
+}
+
+uint32_t TPipedFileReaderTransport::readAll(uint8_t* buf, uint32_t len) {
+  uint32_t have = 0;
+  uint32_t get = 0;
+  
+  while (have < len) {
+    get = read(buf+have, len-have);
+    if (get <= 0) {
+      throw TEOFException();
+    }
+    have += get;
+  }
+  
+  return have;
+}
+
+void TPipedFileReaderTransport::readEnd() {
+  TPipedTransport::readEnd();
+}
+
+void TPipedFileReaderTransport::write(const uint8_t* buf, uint32_t len) {
+  TPipedTransport::write(buf, len);
+}
+
+void TPipedFileReaderTransport::writeEnd() {
+  TPipedTransport::writeEnd();
+}
+
+void TPipedFileReaderTransport::flush() {
+  TPipedTransport::flush();
+}
+
+int32_t TPipedFileReaderTransport::getReadTimeout() {
+  return srcTrans_->getReadTimeout();
+}
+
+void TPipedFileReaderTransport::setReadTimeout(int32_t readTimeout) {
+  srcTrans_->setReadTimeout(readTimeout);
+}
+
+uint32_t TPipedFileReaderTransport::getNumChunks() {
+  return srcTrans_->getNumChunks();
+}
+
+uint32_t TPipedFileReaderTransport::getCurChunk() {
+  return srcTrans_->getCurChunk();
+}
+
+void TPipedFileReaderTransport::seekToChunk(int32_t chunk) {
+  srcTrans_->seekToChunk(chunk);
+}
+
+void TPipedFileReaderTransport::seekToEnd() {
+  srcTrans_->seekToEnd();
+}
+
 }}} // facebook::thrift::transport
diff --git a/lib/cpp/src/transport/TTransportUtils.h b/lib/cpp/src/transport/TTransportUtils.h
index f95ad76..653939f 100644
--- a/lib/cpp/src/transport/TTransportUtils.h
+++ b/lib/cpp/src/transport/TTransportUtils.h
@@ -7,7 +7,9 @@
 #ifndef _THRIFT_TRANSPORT_TTRANSPORTUTILS_H_
 #define _THRIFT_TRANSPORT_TTRANSPORTUTILS_H_ 1
 
+#include <string>
 #include <transport/TTransport.h>
+#include <transport/TFileTransport.h>
 
 namespace facebook { namespace thrift { namespace transport { 
 
@@ -349,7 +351,7 @@
  *
  * @author Aditya Agarwal <aditya@facebook.com>
  */
-class TPipedTransport : public TTransport {
+class TPipedTransport : virtual public TTransport {
  public:
   TPipedTransport(boost::shared_ptr<TTransport> srcTrans, 
                   boost::shared_ptr<TTransport> dstTrans) :
@@ -465,8 +467,10 @@
  */
 class TPipedTransportFactory : public TTransportFactory {
  public:
-  TPipedTransportFactory(boost::shared_ptr<TTransport> dstTrans): dstTrans_(dstTrans) {}
-
+  TPipedTransportFactory() {}
+  TPipedTransportFactory(boost::shared_ptr<TTransport> dstTrans) {
+    initializeTargetTransport(dstTrans);
+  }
   virtual ~TPipedTransportFactory() {}
 
   /**
@@ -476,12 +480,85 @@
     return boost::shared_ptr<TTransport>(new TPipedTransport(srcTrans, dstTrans_));
   }
 
- private:
+  virtual void initializeTargetTransport(boost::shared_ptr<TTransport> dstTrans) {
+    if (dstTrans_.get() == NULL) {
+      dstTrans_ = dstTrans;
+    } else {
+      throw TException("Target transport already initialized");
+    }
+  }
+
+ protected:
   boost::shared_ptr<TTransport> dstTrans_;
 };
 
+/**
+ * TPipedFileTransport. This is just like a TTransport, except that
+ * it is a templatized class, so that clients who rely on a specific 
+ * TTransport can still access the original transport.
+ *
+ * @author James Wang <jwang@facebook.com>
+ */
+class TPipedFileReaderTransport : public TPipedTransport,
+                                  public TFileReaderTransport {
+ public:
+  TPipedFileReaderTransport(boost::shared_ptr<TFileReaderTransport> srcTrans, boost::shared_ptr<TTransport> dstTrans);
+
+  ~TPipedFileReaderTransport();
+
+  // TTransport functions
+  bool isOpen();
+  bool peek();
+  void open();
+  void close();
+  uint32_t read(uint8_t* buf, uint32_t len);
+  uint32_t readAll(uint8_t* buf, uint32_t len);
+  void readEnd();
+  void write(const uint8_t* buf, uint32_t len);
+  void writeEnd();
+  void flush();
+
+  // TFileReaderTransport functions
+  int32_t getReadTimeout();
+  void setReadTimeout(int32_t readTimeout);
+  uint32_t getNumChunks();
+  uint32_t getCurChunk();
+  void seekToChunk(int32_t chunk);
+  void seekToEnd();
+
+ protected:
+  // shouldn't be used
+  TPipedFileReaderTransport();
+  boost::shared_ptr<TFileReaderTransport> srcTrans_;
+};
+
+/**
+ * Creates a TPipedFileReaderTransport from a filepath and a destination transport
+ *
+ * @author James Wang <jwang@facebook.com>
+ */
+class TPipedFileReaderTransportFactory : public TPipedTransportFactory {
+ public:
+  TPipedFileReaderTransportFactory() {}
+  TPipedFileReaderTransportFactory(boost::shared_ptr<TTransport> dstTrans) 
+    : TPipedTransportFactory(dstTrans)
+  {}
+  virtual ~TPipedFileReaderTransportFactory() {}
+
+  boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> srcTrans) {
+    boost::shared_ptr<TFileReaderTransport> pFileReaderTransport = boost::dynamic_pointer_cast<TFileReaderTransport>(srcTrans);
+    if (pFileReaderTransport.get() != NULL) {
+      return getFileReaderTransport(pFileReaderTransport);
+    } else {
+      return boost::shared_ptr<TTransport>();
+    }
+  }
+
+  boost::shared_ptr<TFileReaderTransport> getFileReaderTransport(boost::shared_ptr<TFileReaderTransport> srcTrans) {
+    return boost::shared_ptr<TFileReaderTransport>(new TPipedFileReaderTransport(srcTrans, dstTrans_));
+  }
+};
 
 }}} // facebook::thrift::transport
 
-
 #endif // #ifndef _THRIFT_TRANSPORT_TTRANSPORTUTILS_H_