-- TFileTransport (Thrift Logfile)

Summary:
-- TBufferedFileWriter.h/cpp will be renamed to TFileTransport.h/cpp in the next commit.
-- TFileTransport is essentially reading and writing thrift calls to/from a file instead of a
   socket.
-- The code/design is somewhat similar to pillar_logfile but there are some significant changes.

todo:
-- still need to do error correction/detection

Reviewed By: Mark Slee

Test Plan:
-- Wrote test in thrift/test/cpp/src/main.cpp that appends to a file and replays requests

Notes:
It's finally time to port search over to Thrift


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664889 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/transport/TBufferedFileWriter.cpp b/lib/cpp/src/transport/TBufferedFileWriter.cpp
index c3ed250..39e2074 100644
--- a/lib/cpp/src/transport/TBufferedFileWriter.cpp
+++ b/lib/cpp/src/transport/TBufferedFileWriter.cpp
@@ -1,43 +1,34 @@
 #include "TBufferedFileWriter.h"
+#include "TTransportUtils.h"
 
 #include <pthread.h>
-#include <cassert>
-#include <cstdlib>
-#include <string>
-#include <sys/time.h>
-#include <sys/types.h>
+ #include <sys/time.h>
 #include <fcntl.h>
 #include <errno.h>
+#include <unistd.h>
+#include <iostream>
 
-using std::string;
+using namespace std;
 
 namespace facebook { namespace thrift { namespace transport { 
 
-TBufferedFileWriter::TBufferedFileWriter(string filename, uint32_t sz) {
-  init(filename, sz, 0, 0);
-}
+TFileTransport::TFileTransport(string path) {
+  filename_ = path;
+  openLogFile();
 
-TBufferedFileWriter::TBufferedFileWriter(string filename, uint32_t sz, int fd, long long offset) {
-  init(filename, sz, fd, offset);
-}
+  // set initial values to default
+  readBuffSize_ = DEFAULT_READ_BUFF_SIZE;
+  readTimeout_ = DEFAULT_READ_TIMEOUT_MS;
+  chunkSize_ = DEFAULT_CHUNK_SIZE;
+  eventBufferSize_ = DEFAULT_EVENT_BUFFER_SIZE;
+  flushMaxUs_ = DEFAULT_FLUSH_MAX_US;
+  flushMaxBytes_ = DEFAULT_FLUSH_MAX_BYTES;
+  maxEventSize_ = DEFAULT_MAX_EVENT_SIZE;
+  maxCorruptedEvents_ = DEFAULT_MAX_CORRUPTED_EVENTS;
+  eofSleepTime_ = DEFAULT_EOF_SLEEP_TIME_US;
 
-void TBufferedFileWriter::init(string filename, uint32_t sz, int fd, long long offset) {
-  // validate buffer size
-  sz_ = sz;
-  if (sz_ <= 0) {
-    throw TTransportException("invalid input buffer size");
-  }
-
-  // set file-related variables
-  fd_ = 0;
-  resetOutputFile(fd, filename, offset);
-
-  // set default values of flush related params
-  flushMaxBytes_ = 1024 * 100;
-  flushMaxUs_ = 20 * 1000;
-
-  // allocate event buffer
-  buffer_ = new eventInfo[sz_];
+  // initialize buffer lazily
+  buffer_ = 0;
 
   // buffer is initially empty
   isEmpty_ = true;
@@ -47,9 +38,6 @@
   headPos_ = 0;
   tailPos_ = 0;
 
-  // for lack of a better option, set chunk size to 0. Users can change this to whatever they want
-  chunkSize_ = 0;
-
   // initialize all the condition vars/mutexes
   pthread_mutex_init(&mutex_, NULL);
   pthread_cond_init(&notFull_, NULL);
@@ -59,42 +47,70 @@
   // not closing the file during init
   closing_ = false;
 
-  // spawn writer thread
-  pthread_create(&writer_, NULL, startWriterThread, (void *)this);
+  // create writer thread on demand
+  writerThreadId_ = 0;
+
+  // read related variables
+  // read buff initialized lazily
+  readBuff_ = 0;
+  currentEvent_ = 0;
 }
 
-void TBufferedFileWriter::resetOutputFile(int fd, string filename, long long offset) {
+void TFileTransport::resetOutputFile(int fd, string filename, long long offset) {
   filename_ = filename;
   offset_ = offset;
 
   // check if current file is still open
   if (fd_ > 0) {
-    // TODO: unclear if this should throw an error
-    fprintf(stderr, "error, current file not closed (trying to open %s)\n", filename_.c_str());
+    // TODO: should there be a flush here?
+    fprintf(stderr, "error, current file (%s) not closed\n", filename_.c_str());
     ::close(fd_);
   }
-  fd_ = fd;
+
+  if (fd) {
+    fd_ = fd;
+  } else {
+    // open file if the input fd is 0
+    openLogFile();
+  }
 }
 
 
-TBufferedFileWriter::~TBufferedFileWriter() {
-  // flush output buffer
-  flush();
+TFileTransport::~TFileTransport() {
+  // TODO: Make sure the buffer is actually flushed
+  // flush the buffer if a writer thread is active
+  if (writerThreadId_ > 0) {
+    // flush output buffer
+    flush();
 
-  // send a signal to write thread to end
-  closing_ = true;
-  pthread_join(writer_, NULL);
+    // send a signal to write thread to end
+    closing_ = true;
+    pthread_join(writerThreadId_, NULL);
+  }
 
-  delete[] buffer_;
+  if (buffer_) {
+    delete[] buffer_;
+  }
 
-  // TODO: should the file be closed here?
+  if (readBuff_) {
+    delete readBuff_;
+  }
+
+  if (currentEvent_) {
+    delete currentEvent_;
+  }
+
+  // close logfile
+  if (fd_ > 0) {
+    ::close(fd_);
+  }
 }
 
 
-void TBufferedFileWriter::enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush) {
+void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush) {
   // make sure that event size is valid
   if ( (maxEventSize_ > 0) && (eventLen > maxEventSize_) ) {
-    //    T_ERROR("msg size is greater than max event size: %lu > %u\n", eventLen, maxEventSize_);
+    T_DEBUG("msg size is greater than max event size: %lu > %u\n", eventLen, maxEventSize_);
     return;
   }
 
@@ -103,17 +119,33 @@
     return;
   }
 
-  eventInfo toEnqueue;
-  uint8_t* bufCopy = (uint8_t *)malloc(sizeof(uint8_t) * eventLen);
-  toEnqueue.payLoad_ = bufCopy;
-  toEnqueue.eventSize_ = eventLen;
+  eventInfo* toEnqueue = new eventInfo();
+  toEnqueue->eventBuff_ = (uint8_t *)malloc((sizeof(uint8_t) * eventLen) + 4);
+  // first 4 bytes is the event length
+  memcpy(toEnqueue->eventBuff_, (void*)(&eventLen), 4);
+  // actual event contents
+  memcpy(toEnqueue->eventBuff_ + 4, buf, eventLen);
+  toEnqueue->eventSize_ = eventLen + 4;
 
+  //  T_DEBUG_L(1, "event size: %u", eventLen);
   return enqueueEvent(toEnqueue, blockUntilFlush);
 }
 
-void TBufferedFileWriter::enqueueEvent(const eventInfo& toEnqueue, bool blockUntilFlush) {
-  // Lock mutex
+void TFileTransport::enqueueEvent(eventInfo* toEnqueue, bool blockUntilFlush) {
+  // lock mutex
   pthread_mutex_lock(&mutex_);
+
+  // make sure that enqueue buffer is initialized and writer thread is running
+  if (buffer_ == 0) {
+    buffer_ = new eventInfo*[eventBufferSize_];
+  }
+  if (writerThreadId_ == 0) {
+    if (pthread_create(&writerThreadId_, NULL, startWriterThread, (void *)this) != 0) {
+      T_ERROR("Error creating write thread");
+      return;
+    }
+  }
+
   // Can't enqueue while buffer is full
   while(isFull_) {
     pthread_cond_wait(&notFull_, &mutex_);
@@ -121,7 +153,7 @@
 
   // make a copy and enqueue at tail of buffer
   buffer_[tailPos_] = toEnqueue;
-  tailPos_ = (tailPos_+1) % sz_;
+  tailPos_ = (tailPos_+1) % eventBufferSize_;
   
   // mark the buffer as non-empty
   isEmpty_ = false;
@@ -146,7 +178,7 @@
 
 }
 
-eventInfo TBufferedFileWriter::dequeueEvent(long long deadline) {
+eventInfo* TFileTransport::dequeueEvent(long long deadline) {
   //deadline time struc
   struct timespec ts;
   if(deadline) {
@@ -174,10 +206,10 @@
   bool doSignal = false;
 
   // could be empty if we timed out
-  eventInfo retEvent;
+  eventInfo* retEvent = 0;
   if(!isEmpty_) {
     retEvent = buffer_[headPos_];
-    headPos_ = (headPos_+1) % sz_;
+    headPos_ = (headPos_+1) % eventBufferSize_;
 
     isFull_ = false;
     doSignal = true;
@@ -194,16 +226,129 @@
     pthread_cond_signal(&notFull_);
   }
 
+  if (!retEvent) {
+    retEvent = new eventInfo();
+  }
   return retEvent;
 }
 
 
-void TBufferedFileWriter::flush()
-{
-  eventInfo flushEvent;
-  flushEvent.payLoad_ = NULL;
-  flushEvent.eventSize_ = 0;
+void TFileTransport::writerThread() {
+  // open file if it is not open
+  if(!fd_) {
+    openLogFile();
+  }
 
+  // set the offset to the correct value (EOF)
+  offset_ = lseek(fd_, 0, SEEK_END);
+
+  // Figure out the next time by which a flush must take place
+  long long nextFlush = getCurrentTime() + flushMaxUs_;
+  uint32_t unflushed = 0;
+
+  while(1) {
+    // this will only be true when the destructor is being invoked
+    if(closing_) {
+      if(-1 == ::close(fd_)) {
+        perror("TFileTransport: error in close");
+      }
+      throw TTransportException("error in file close");
+      fd_ = 0;
+      return;
+    }
+
+    //long long start = now();
+    eventInfo* outEvent = dequeueEvent(nextFlush);    
+    if (!outEvent) {
+      T_DEBUG_L(1, "Got an empty event");
+      return;
+    }
+
+    // sanity check on event
+    if ( (maxEventSize_ > 0) && (outEvent->eventSize_ > maxEventSize_)) {
+      T_ERROR("msg size is greater than max event size: %u > %u\n", outEvent->eventSize_, maxEventSize_);
+      delete(outEvent);
+      continue;
+    }
+    //long long diff = now()-start;
+    //T_DEBUG("got a dequeue of size %d after %lld ms\n", (int)s.size(), diff/1000);
+
+    // If chunking is required, then make sure that msg does not cross chunk boundary
+    if( (outEvent->eventSize_ > 0) && (chunkSize_ != 0)) {
+
+      // event size must be less than chunk size
+      if(outEvent->eventSize_ > chunkSize_) {
+        T_ERROR("TFileTransport: event size(%u) is greater than chunk size(%u): skipping event",
+              outEvent->eventSize_, chunkSize_);
+        delete(outEvent);
+        continue;
+      }
+
+      long long chunk1 = offset_/chunkSize_;
+      long long chunk2 = (offset_ + outEvent->eventSize_ - 1)/chunkSize_;
+      
+      // if adding this event will cross a chunk boundary, pad the chunk with zeros
+      if(chunk1 != chunk2) {
+        int32_t padding = (int32_t)(chunk2*chunkSize_ - offset_);
+
+        // sanity check
+        if (padding <= 0) {
+          T_DEBUG("Padding is empty, skipping event");
+         continue;
+        }
+        if (padding > (int32_t)chunkSize_) {
+          T_DEBUG("padding is larger than chunk size, skipping event");
+          continue;
+        }
+        //        T_DEBUG("padding %d zeros to get to chunk %lld\n", padding, chunk2);
+        uint8_t zeros[padding];
+        bzero(zeros, padding);
+        T_DEBUG_L(1, "Adding padding of %u bytes at %lu", padding, offset_);
+        if(-1 == ::write(fd_, zeros, padding)) {
+          perror("TFileTransport: error while padding zeros");
+          throw TTransportException("TFileTransport: error while padding zeros");
+        }
+        unflushed += padding;
+        offset_ += padding;
+      }
+    }
+
+    // write the dequeued event to the file
+    if(outEvent->eventSize_ > 0) {
+      if(-1 == ::write(fd_, outEvent->eventBuff_, outEvent->eventSize_)) {
+        perror("TFileTransport: error while writing event");
+        // TODO: should this trigger an exception or simply continue?
+        throw TTransportException("TFileTransport: error while writing event");
+      }
+
+      unflushed += outEvent->eventSize_;
+      offset_ += outEvent->eventSize_;
+    }
+
+    // couple of cases from which a flush could be triggered
+    if((getCurrentTime() >= nextFlush && unflushed > 0) ||
+       unflushed > flushMaxBytes_ ||
+       (outEvent && (outEvent->eventSize_== 0)) ) {
+      //T_DEBUG("flushing %d bytes to %s (%d %d, full? %d)", unflushed, filename_.c_str(), headPos_, tailPos_, isFull_);
+
+      // sync (force flush) file to disk
+      fsync(fd_);
+      nextFlush = getCurrentTime() + flushMaxUs_;
+      unflushed = 0;
+
+      // notify anybody(thing?) waiting for flush completion
+      pthread_mutex_lock(&mutex_);
+      notFlushed_ = false;
+      pthread_mutex_unlock(&mutex_);
+      pthread_cond_broadcast(&flushed_);
+    }
+    // deallocate dequeued event
+    delete(outEvent);
+  }
+}
+
+void TFileTransport::flush() {
+  eventInfo* flushEvent = new eventInfo();
   notFlushed_ = true;
 
   enqueueEvent(flushEvent, false);
@@ -218,20 +363,264 @@
   pthread_mutex_unlock(&mutex_);
 }
 
-void TBufferedFileWriter::openOutputFile() {
+
+uint32_t TFileTransport::readAll(uint8_t* buf, uint32_t len) {
+  uint32_t have = 0;
+  uint32_t get = 0;
+  
+  while (have < len) {
+    get = read(buf+have, len-have);
+    if (get <= 0) {
+      throw TEOFException();
+    }
+    have += get;
+  }
+  
+  return have;
+}
+
+uint32_t TFileTransport::read(uint8_t* buf, uint32_t len) {
+  // check if there an event is ready to be read
+  if (!currentEvent_) {
+    readEvent();
+  }
+  
+  // did not manage to read an event from the file. This could have happened
+  // if the timeout expired or there was some other error
+  if (!currentEvent_) {
+    return 0;
+  }
+
+  // read as much of the current event as possible
+  int32_t remaining = currentEvent_->eventSize_ - currentEvent_->eventBuffPos_;
+  if (remaining <= (int32_t)len) {
+    memcpy(buf, 
+           currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_, 
+           remaining);
+    delete(currentEvent_);
+    currentEvent_ = 0;
+    return remaining;
+  }
+  
+  // read as much as possible
+  memcpy(buf, currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_, len);
+  currentEvent_->eventBuffPos_ += len;
+  return len;
+}
+
+bool TFileTransport::readEvent() {
+  int readTries = 0;
+
+  if (!readBuff_) {
+    readBuff_ = new uint8_t[readBuffSize_];    
+  }
+
+  while (1) {
+    // check if there is anything in the read buffer
+    if (readState_.bufferPtr_ == readState_.bufferLen_) {
+      // advance the offset pointer
+      offset_ += readState_.bufferLen_;
+      readState_.bufferLen_ = ::read(fd_, readBuff_, readBuffSize_);
+      if (readState_.bufferLen_) {
+        T_DEBUG_L(1, "Amount read: %u (offset: %lu)", readState_.bufferLen_, offset_);
+      }
+      readState_.bufferPtr_ = 0;
+      readState_.lastDispatchPtr_ = 0;
+  
+      // read error
+      if (readState_.bufferLen_ == -1) {
+        readState_.resetAllValues();
+        perror("TFileTransport: error while reading from file");
+        // TODO: should this trigger an exception or simply continue?
+        throw TTransportException("TFileTransport: error while reading from file");
+      } else if (readState_.bufferLen_ == 0) {  // EOF
+        // wait indefinitely if there is no timeout
+        if (readTimeout_ == -1) {
+          usleep(eofSleepTime_);
+          continue;
+        } else if (readTimeout_ == 0) {
+          // reset state
+          readState_.resetState(0);
+          return false;
+        } else if (readTimeout_ > 0) {
+          // timeout already expired once
+          if (readTries > 0) {
+            readState_.resetState(0);
+            return false;
+          } else {
+            usleep(readTimeout_ * 1000);
+            readTries++;
+            continue;
+          }
+        }
+      }
+    }
+    
+    readTries = 0;
+
+    // attempt to read an event from the buffer
+    while(readState_.bufferPtr_ < readState_.bufferLen_) {
+      if (readState_.readingSize_) {
+        if(readState_.eventSizeBuffPos_ == 0) {
+          if ( (offset_ + readState_.bufferPtr_)/chunkSize_ != 
+               ((offset_ + readState_.bufferPtr_ + 3)/chunkSize_)) {
+            // skip one byte towards chunk boundary
+            //            T_DEBUG_L(1, "Skipping a byte");
+            readState_.bufferPtr_++;
+            continue;
+          }
+        }
+
+        readState_.eventSizeBuff_[readState_.eventSizeBuffPos_++] = 
+          readBuff_[readState_.bufferPtr_++];
+        bool eventCorruption = false;
+        if (readState_.eventSizeBuffPos_ == 4) {
+          // 0 length event indicates padding
+          if (*((uint32_t *)(readState_.eventSizeBuff_)) == 0) {
+            T_DEBUG_L(1, "Got padding");
+            readState_.resetState(readState_.lastDispatchPtr_);
+            continue;
+          }
+          // got a valid event
+          readState_.readingSize_ = false;
+          if (readState_.event_) {
+            delete(readState_.event_);
+          }
+          readState_.event_ = new eventInfo();
+          readState_.event_->eventSize_ = *((uint32_t *)(readState_.eventSizeBuff_));
+
+          T_DEBUG_L(0, "Event size: %u", readState_.event_->eventSize_);
+
+          // TODO
+          // make sure event is valid, an error is triggered if:
+          // 1. Event size is larger than user-speficied max-event size
+
+          // 2. Event size is larger than chunk size
+
+          // 3. size indicates that event crosses chunk boundary
+
+        }
+
+        if (eventCorruption) {
+          // perform some kickass recovery 
+        }
+      } else {
+        if (!readState_.event_->eventBuff_) {
+          readState_.event_->eventBuff_ = new uint8_t[readState_.event_->eventSize_];
+          readState_.event_->eventBuffPos_ = 0;
+        }
+        // take either the entire event or the remaining bytes in the buffer
+        int reclaimBuffer = min((uint32_t)(readState_.bufferLen_ - readState_.bufferPtr_),
+                                readState_.event_->eventSize_ - readState_.event_->eventBuffPos_);
+
+        // copy data from read buffer into event buffer
+        memcpy(readState_.event_->eventBuff_ + readState_.event_->eventBuffPos_, 
+               readBuff_ + readState_.bufferPtr_,
+               reclaimBuffer);
+        
+        // increment position ptrs
+        readState_.event_->eventBuffPos_ += reclaimBuffer;
+        readState_.bufferPtr_ += reclaimBuffer;
+        
+        //         if (reclaimBuffer > 0) {
+        //           T_DEBUG_L(0, "eventBuffPost: %u", readState_.event_->eventBuffPos_);
+        //           T_DEBUG_L(0, "eventSize: %u", readState_.event_->eventSize_);
+        //         }
+
+        // check if the event has been read in full
+        if (readState_.event_->eventBuffPos_ == readState_.event_->eventSize_) {
+          // set the completed event to the current event
+          currentEvent_ = readState_.event_;
+          currentEvent_->eventBuffPos_ = 0;
+          
+          readState_.event_ = 0;
+          readState_.resetState(readState_.bufferPtr_);
+          
+          // exit criteria
+          T_DEBUG_L(0, "Finished one event");
+          return true;
+        }
+      }
+    }
+    
+    
+  }
+}
+
+void TFileTransport::seekToChunk(int32_t chunk) {
+  if (fd_ <= 0) {
+    throw TTransportException("File not open");
+  }
+ 
+  int32_t lastChunk = getNumChunks();
+
+  // negative indicates reverse seek (from the end)
+  if (chunk < 0) {
+    chunk += lastChunk;
+  }
+  
+  // cannot seek past EOF
+  if (chunk > lastChunk) {
+    T_DEBUG("Trying to seek past EOF. Seeking to EOF instead");
+    chunk = lastChunk;
+  }
+
+  uint32_t minEndOffset = 0;
+  if (chunk == lastChunk) {
+    minEndOffset = lseek(fd_, 0, SEEK_END);
+  }
+  
+  offset_ = lseek(fd_, chunk * chunkSize_, SEEK_SET);  
+  readState_.resetAllValues();
+  if (offset_ == -1) {
+    perror("TFileTransport: lseek error in seekToChunk");
+    // TODO: should this trigger an exception or simply continue?
+    throw TTransportException("TFileTransport: lseek error in seekToChunk");
+  }
+
+  // seek to EOF if user wanted to go to last chunk
+  uint32_t oldReadTimeout = getReadTimeout();
+  setReadTimeout(0);  
+  if (chunk == lastChunk) {
+    // keep on reading unti the last event at point of seekChunk call
+    while( readEvent() && ((offset_ + readState_.bufferPtr_) < minEndOffset)) {};
+  }
+  setReadTimeout(oldReadTimeout);
+
+}
+
+void TFileTransport::seekToEnd() {
+  seekToChunk(getNumChunks());
+}
+
+uint32_t TFileTransport::getNumChunks() {
+  if (fd_ <= 0) {
+    return 0;
+  }
+  struct stat f_info;
+  fstat(fd_, &f_info);
+  return (f_info.st_size)/chunkSize_;
+}
+
+// Utility Functions
+void TFileTransport::openLogFile() {
   mode_t mode = S_IRUSR| S_IWUSR| S_IRGRP | S_IROTH;
-  fd_ = ::open(filename_.c_str(), O_WRONLY | O_CREAT | O_APPEND, mode);
+  fd_ = ::open(filename_.c_str(), O_RDWR | O_CREAT | O_APPEND, mode);
 
   // make sure open call was successful
   if(fd_ == -1) {
     char errorMsg[1024];
-    sprintf(errorMsg, "TBufferedFileWriter: Could not open file: %s", filename_.c_str());
+    sprintf(errorMsg, "TFileTransport: Could not open file: %s", filename_.c_str());
     perror(errorMsg);
     throw TTransportException(errorMsg);
   }
+
+  // opening the file in append mode causes offset_t to be at the end
+  offset_ = lseek(fd_, 0, SEEK_CUR);
+  T_DEBUG_L(1, "initial offset: %lu", offset_);
 }
 
-uint32_t TBufferedFileWriter::getCurrentTime() {
+uint32_t TFileTransport::getCurrentTime() {
   long long ret;
   struct timeval tv;
   gettimeofday(&tv, NULL);
@@ -241,108 +630,60 @@
 }
 
 
-void TBufferedFileWriter::writerThread() {
-  // open file if it is not open
-  if(!fd_) {
-    openOutputFile();
-  }
+TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
+                               shared_ptr<TProtocolFactory> protocolFactory,
+                               shared_ptr<TFileTransport> inputTransport):
+  processor_(processor), protocolFactory_(protocolFactory), 
+  inputTransport_(inputTransport) {
 
-  // Figure out the next time by which a flush must take place
-  long long nextFlush = getCurrentTime() + flushMaxUs_;
-  uint32_t unflushed = 0;
+  // default the output transport to a null transport (common case)
+  outputTransport_ = shared_ptr<TNullTransport>(new TNullTransport());
+}
 
+TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
+                               shared_ptr<TProtocolFactory> protocolFactory,
+                               shared_ptr<TFileTransport> inputTransport,
+                               shared_ptr<TTransport> outputTransport):
+  processor_(processor), protocolFactory_(protocolFactory), 
+  inputTransport_(inputTransport), outputTransport_(outputTransport) {
+};
+
+void TFileProcessor::process(uint32_t numEvents, bool tail) {
+  pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
+  iop = protocolFactory_->getIOProtocols(inputTransport_, outputTransport_);
+
+  // set the read timeout to 0 if tailing is required
+  int32_t oldReadTimeout = inputTransport_->getReadTimeout();
+  if (tail) {
+    // save old read timeout so it can be restored
+    inputTransport_->setReadTimeout(0);
+  } 
+
+  uint32_t numProcessed = 0;
   while(1) {
-    // this will only be true when the destructor is being invoked
-    if(closing_) {
-      if(-1 == ::close(fd_)) {
-        perror("TBufferedFileWriter: error in close");
+    // bad form to use exceptions for flow control but there is really
+    // no other way around it
+    try {
+      processor_->process(iop.first, iop.second);
+      numProcessed++;
+      if ( (numEvents > 0) && (numProcessed == numEvents)) {
+        return;
       }
-      throw TTransportException("error in file close");
-    }
-
-    //long long start = now();
-    eventInfo outEvent = dequeueEvent(nextFlush);
-
-    // sanity check on event
-    if ( (maxEventSize_ > 0) && (outEvent.eventSize_ > maxEventSize_)) {
-      T_ERROR("msg size is greater than max event size: %u > %u\n", outEvent.eventSize_, maxEventSize_);
-      continue;
-    }
-    //long long diff = now()-start;
-    //T_DEBUG("got a dequeue of size %d after %lld ms\n", (int)s.size(), diff/1000);
-
-    // If chunking is required, then make sure that msg does not cross chunk boundary
-    if( (outEvent.eventSize_ > 0) && (chunkSize_ != 0)) {
-
-      // event size must be less than chunk size
-      if(outEvent.eventSize_ > chunkSize_) {
-        T_ERROR("TBufferedFileWriter: event size(%u) is greater than chunk size(%u): skipping event",
-              outEvent.eventSize_, chunkSize_);
-        continue;
+    } catch (TEOFException& teof) {
+      if (!tail) {
+        break;
       }
-
-      long long chunk1 = offset_/chunkSize_;
-      long long chunk2 = (offset_ + outEvent.eventSize_ - 1)/chunkSize_;
-      
-      // if adding this event will cross a chunk boundary, pad the chunk with zeros
-      if(chunk1 != chunk2) {
-        int padding = (int)(chunk2*chunkSize_ - offset_);
-
-        // sanity check
-        if (padding <= 0) {
-          T_DEBUG("Padding is empty, skipping event");
-          continue;
-        }
-        if (padding > (int32_t)chunkSize_) {
-          T_DEBUG("padding is larger than chunk size, skipping event");
-          continue;
-        }
-        //        T_DEBUG("padding %d zeros to get to chunk %lld\n", padding, chunk2);
-        uint8_t zeros[padding];
-        bzero(zeros, padding);
-        if(-1 == ::write(fd_, zeros, padding)) {
-          perror("TBufferedFileWriter: error while padding zeros");
-          throw TTransportException("TBufferedFileWriter: error while padding zeros");
-        }
-        unflushed += padding;
-        offset_ += padding;
-      }
-    }
-
-    // write the dequeued event to the file
-    if(outEvent.eventSize_ > 0) {
-      if(-1 == ::write(fd_, outEvent.payLoad_, outEvent.eventSize_)) {
-        perror("TBufferedFileWriter: error while writing event");
-        // TODO: should this trigger an exception or simply continue?
-        throw TTransportException("TBufferedFileWriter: error while writing event");
-      }
-
-      // deallocate payload
-      free(outEvent.payLoad_);
-
-      unflushed += outEvent.eventSize_;
-      offset_ += outEvent.eventSize_;
-    }
-
-    // couple of cases from which a flush could be triggered
-    if((getCurrentTime() >= nextFlush && unflushed > 0) ||
-       unflushed > flushMaxBytes_ ||
-       (outEvent.eventSize_ == 0) ) {
-      //T_DEBUG("flushing %d bytes to %s (%d %d, full? %d)", unflushed, filename_.c_str(), headPos_, tailPos_, isFull_);
-
-      // sync (force flush) file to disk
-      fsync(fd_);
-      nextFlush = getCurrentTime() + flushMaxUs_;
-      unflushed = 0;
-
-      // notify anybody(thing?) waiting for flush completion
-      pthread_mutex_lock(&mutex_);
-      notFlushed_ = false;
-      pthread_mutex_unlock(&mutex_);
-      pthread_cond_broadcast(&flushed_);
+    } catch (TException te) {
+      cerr << te.what() << endl;
+      break;
     }
   }
 
+  // restore old read timeout
+  if (tail) {
+    inputTransport_->setReadTimeout(oldReadTimeout);
+  }  
+
 }
 
 }}} // facebook::thrift::transport
diff --git a/lib/cpp/src/transport/TBufferedFileWriter.h b/lib/cpp/src/transport/TBufferedFileWriter.h
index c327aab..8192332 100644
--- a/lib/cpp/src/transport/TBufferedFileWriter.h
+++ b/lib/cpp/src/transport/TBufferedFileWriter.h
@@ -1,10 +1,12 @@
-#ifndef _THRIFT_TRANSPORT_TBUFFEREDFILEWRITER_H_
-#define _THRIFT_TRANSPORT_TBUFFEREDFILEWRITER_H_ 1
+#ifndef _THRIFT_TRANSPORT_TFILETRANSPORT_H_
+#define _THRIFT_TRANSPORT_TFILETRANSPORT_H_ 1
 
 #include "TTransport.h"
 #include "Thrift.h"
+#include "TProcessor.h"
 
 #include <string>
+#include <stdio.h>
 
 #include <boost/shared_ptr.hpp>
 
@@ -15,42 +17,154 @@
 
 // Data pertaining to a single event
 typedef struct eventInfo {
-   uint8_t* payLoad_;
-   uint32_t eventSize_;
+  uint8_t* eventBuff_;
+  uint32_t eventSize_;
+  uint32_t eventBuffPos_;
 
-  eventInfo():payLoad_(NULL), eventSize_(0){};
+  eventInfo():eventBuff_(NULL), eventSize_(0), eventBuffPos_(0){};
+  ~eventInfo() {
+    if (eventBuff_) {
+      delete[] eventBuff_;
+    }
+  }
 } eventInfo;
 
+// information about current read state
+typedef struct readState {
+  eventInfo* event_;
 
+  // keep track of event size
+  uint8_t   eventSizeBuff_[4];
+  uint8_t   eventSizeBuffPos_;
+  bool      readingSize_;
+
+  // read buffer variables
+  int32_t  bufferPtr_;
+  int32_t  bufferLen_;
+
+  // last successful dispatch point
+  int32_t lastDispatchPtr_;
+  
+  void resetState(uint32_t lastDispatchPtr) {
+    readingSize_ = true;
+    eventSizeBuffPos_ = 0;
+    lastDispatchPtr_ = lastDispatchPtr;
+  }
+
+  void resetAllValues() {
+    resetState(0);
+    bufferPtr_ = 0;
+    bufferLen_ = 0;
+    if (event_) {
+      delete(event_);
+    }
+    event_ = 0;
+  }
+
+  readState() {
+    event_ = 0;
+    resetAllValues();
+  }
+
+  ~readState() {
+    if (event_) {
+      delete(event_);
+    }
+  }
+
+} readState;
+ 
 /**
- * Class that stores a circular in-memory event/message buffer and writes 
- * elements to disk when the buffer becomes full or a flush is triggered.
+ * File implementation of a transport. Reads and writes are done to a 
+ * file on disk.
  *
  * @author Aditya Agarwal <aditya@facebook.com>
  */
-class TBufferedFileWriter : public TTransport {
+class TFileTransport : public TTransport {
  public:
+  TFileTransport(string path);
+  ~TFileTransport();
+
+  // TODO: what is the correct behaviour for this?
+  // the log file is generally always open
+  bool isOpen() {
+    return true;
+  }
+  
+  void write(const uint8_t* buf, uint32_t len) {
+    enqueueEvent(buf, len, false);
+  }
+  
+  void flush();
+
+  uint32_t readAll(uint8_t* buf, uint32_t len);
+  uint32_t read(uint8_t* buf, uint32_t len);
+
+  // log-file specific functions
+  void seekToChunk(int chunk);
+  void seekToEnd();
+  uint32_t getNumChunks();
+
+  // for changing the output file
+  void resetOutputFile(int fd, string filename, long long offset);
+
+  // Setter/Getter functions for user-controllable options
+  void setReadBuffSize(uint32_t readBuffSize) {
+    if (readBuffSize) {
+      readBuffSize_ = readBuffSize;
+    }
+  }
+  uint32_t getReadBuffSize() {
+    return readBuffSize_;
+  }
+
+  void setReadTimeout(int32_t readTimeout) {
+    readTimeout_ = readTimeout;
+  }
+  int32_t getReadTimeout() {
+    return readTimeout_;
+  }
+
+  void setChunkSize(uint32_t chunkSize) {
+    if (chunkSize) {
+      chunkSize_ = chunkSize;
+    }
+  }
+  uint32_t getChunkSize() {
+    return chunkSize_;
+  }
+
+  void setEventBufferSize(uint32_t bufferSize) {    
+    if (bufferSize) {
+      if (buffer_) {
+        delete[] buffer_;
+      }
+      eventBufferSize_ = bufferSize;
+      buffer_ = new eventInfo*[eventBufferSize_];
+    }
+  }
+  uint32_t getEventBufferSize() {
+    return eventBufferSize_;
+  }
+
   void setFlushMaxUs(uint32_t flushMaxUs) {
-    flushMaxUs_ = flushMaxUs;
+    if (flushMaxUs) {
+      flushMaxUs_ = flushMaxUs;
+    }
   }
   uint32_t getFlushMaxUs() {
     return flushMaxUs_;
   }
 
   void setFlushMaxBytes(uint32_t flushMaxBytes) {
-    flushMaxBytes_ = flushMaxBytes;
+    if (flushMaxBytes) {
+      flushMaxBytes_ = flushMaxBytes;
+    }
   }
   uint32_t getFlushMaxBytes() {
     return flushMaxBytes_;
   }
 
-  void setChunkSize(uint32_t chunkSize) {
-    chunkSize_ = chunkSize;
-  }
-  uint32_t getChunkSize() {
-    return chunkSize_;
-  }
-
   void setMaxEventSize(uint32_t maxEventSize) {
     maxEventSize_ = maxEventSize;
   }
@@ -58,52 +172,88 @@
     return maxEventSize_;
   }
 
-  TBufferedFileWriter(string filename, uint32_t sz);
-  TBufferedFileWriter(string filename, uint32_t sz, int fd, long long offset);
-  void init(string filename, uint32_t sz, int fd, long long offset);
-  ~TBufferedFileWriter();
-
-  void resetOutputFile(int fd, string filename, long long offset);
-
-  void enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush);
-  void enqueueEvent(const eventInfo& toEnqueue, bool blockUntilFlush);
-  void write(const uint8_t* buf, uint32_t len) {
-    enqueueEvent(buf, len, false);
+  void setMaxCorruptedEvents(uint32_t maxCorruptedEvents) {
+    maxCorruptedEvents_ = maxCorruptedEvents;
+  }
+  uint32_t getMaxCorruptedEvents() {
+    return maxCorruptedEvents_;
   }
 
-  eventInfo dequeueEvent(long long deadline);
-  void flush();
+  void setEofSleepTimeUs(uint32_t eofSleepTime) {
+    if (eofSleepTime) {
+      eofSleepTime_ = eofSleepTime;
+    }
+  }
+  uint32_t getEofSleepTimeUs() {
+    return eofSleepTime_;
+  }
+
+ private:
+  // helper functions for writing to a file
+  void enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush);
+  void enqueueEvent(eventInfo* toEnqueue, bool blockUntilFlush);
+  eventInfo* dequeueEvent(long long deadline);
 
   // control for writer thread
   static void* startWriterThread(void* ptr) {
-    (((TBufferedFileWriter*)ptr)->writerThread());
+    (((TFileTransport*)ptr)->writerThread());
     return 0;
   }
   void writerThread();
 
+  // helper functions for reading from a file
+  bool readEvent();
 
- private:
-  // circular buffer to hold data in before it is flushed. This is an array of strings. Each
-  // element of the array stores a msg that needs to be written to the file
-  eventInfo* buffer_;
-  
-  // size of string buffer
-  uint32_t sz_;
+  // Utility functions
+  void openLogFile();
+  uint32_t getCurrentTime();
+
+  // Class variables
+  readState readState_;
+  uint8_t* readBuff_;
+
+  eventInfo* currentEvent_;
+
+  uint32_t readBuffSize_;
+  static const uint32_t DEFAULT_READ_BUFF_SIZE = 1 * 1024 * 1024;
+
+  int32_t readTimeout_;
+  static const int32_t DEFAULT_READ_TIMEOUT_MS = 200;
 
   // size of chunks that file will be split up into
   uint32_t chunkSize_;
+  static const uint32_t DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024;
 
+  // size of string buffer
+  uint32_t eventBufferSize_;
+  static const uint32_t DEFAULT_EVENT_BUFFER_SIZE = 1024;
+
+  // circular buffer to hold data in before it is flushed. This is an array of strings. Each
+  // element of the array stores a msg that needs to be written to the file
+  eventInfo** buffer_;
+  
   // max number of microseconds that can pass without flushing
   uint32_t flushMaxUs_;
+  static const uint32_t DEFAULT_FLUSH_MAX_US = 20000;
 
   // max number of bytes that can be written without flushing
   uint32_t flushMaxBytes_;
+  static const uint32_t DEFAULT_FLUSH_MAX_BYTES = 1000 * 1024;
 
   // max event size
   uint32_t maxEventSize_;
+  static const uint32_t DEFAULT_MAX_EVENT_SIZE = 0;
+
+  // max number of corrupted events per chunk
+  uint32_t maxCorruptedEvents_;
+  static const uint32_t DEFAULT_MAX_CORRUPTED_EVENTS = 0;
   
+  // sleep duration when EOF is hit
+  uint32_t eofSleepTime_;
+  static const uint32_t DEFAULT_EOF_SLEEP_TIME_US = 500 * 1000;
+    
   // writer thread id
-  pthread_t writer_;
+  pthread_t writerThreadId_;
 
   // variables that determine position of head/tail of circular buffer
   int headPos_, tailPos_;
@@ -126,13 +276,61 @@
   int fd_;
 
   // Offset within the file
-  long long offset_;
-
-  void openOutputFile();
-  uint32_t getCurrentTime();
+  off_t offset_;
 
 };
 
-}}}
+// Exception thrown when EOF is hit
+class TEOFException : public facebook::thrift::TTransportException {
+ public:
+  TEOFException():
+    facebook::thrift::TTransportException(TTX_EOF) {};
+};
 
-#endif // _THRIFT_TRANSPORT_TBUFFEREDFILEWRITER_H_
+
+// wrapper class to process events from a file containing thrift events
+class TFileProcessor {
+ public:
+  /** 
+   * Constructor that defaults output transport to null transport
+   * 
+   * @param processor processes log-file events
+   * @param protocolFactory protocol factory
+   * @param inputTransport file transport
+   */
+  TFileProcessor(shared_ptr<TProcessor> processor,
+                 shared_ptr<TProtocolFactory> protocolFactory,
+                 shared_ptr<TFileTransport> inputTransport);
+
+  /** 
+   * Constructor
+   * 
+   * @param processor processes log-file events
+   * @param protocolFactory protocol factory
+   * @param inputTransport input file transport
+   * @param output output transport
+   */    
+  TFileProcessor(shared_ptr<TProcessor> processor,
+                 shared_ptr<TProtocolFactory> protocolFactory,
+                 shared_ptr<TFileTransport> inputTransport,
+                 shared_ptr<TTransport> outputTransport);                      
+
+  /**
+   * processes events from the file
+   *
+   * @param numEvents number of events to process (0 for unlimited)
+   * @param tail tails the file if true
+   */
+  void process(uint32_t numEvents, bool tail);
+
+ private:
+  shared_ptr<TProcessor> processor_;
+  shared_ptr<TProtocolFactory> protocolFactory_;
+  shared_ptr<TFileTransport> inputTransport_;
+  shared_ptr<TTransport> outputTransport_;
+};
+
+ 
+}}} // facebook::thrift::transport
+
+#endif // _THRIFT_TRANSPORT_TFILETRANSPORT_H_
diff --git a/lib/cpp/src/transport/TTransportException.h b/lib/cpp/src/transport/TTransportException.h
index e02eb70..de94a7c 100644
--- a/lib/cpp/src/transport/TTransportException.h
+++ b/lib/cpp/src/transport/TTransportException.h
@@ -12,6 +12,7 @@
   TTX_UNKNOWN = 0,
   TTX_NOT_OPEN = 1,
   TTX_TIMED_OUT = 2,
+  TTX_EOF = 3,
 };
 
 /**
diff --git a/lib/cpp/src/transport/TTransportUtils.h b/lib/cpp/src/transport/TTransportUtils.h
index a8003cf..427cc0e 100644
--- a/lib/cpp/src/transport/TTransportUtils.h
+++ b/lib/cpp/src/transport/TTransportUtils.h
@@ -25,7 +25,10 @@
 
   void open() {}
 
-  void write(const std::string& s) {}
+  void write(const uint8_t* buf, uint32_t len) {
+    return;
+  }
+
 };
 
 
diff --git a/test/cpp/src/main.cpp b/test/cpp/src/main.cpp
index 8344a88..47ae671 100644
--- a/test/cpp/src/main.cpp
+++ b/test/cpp/src/main.cpp
@@ -11,6 +11,7 @@
 #include <transport/TTransportUtils.h>
 #include <transport/TBufferedRouterTransport.h>
 #include <transport/TBufferedFileWriter.h>
+#include <TLogging.h>
 
 #include "Service.h"
 
@@ -73,7 +74,12 @@
   int8_t echoByte(int8_t arg) {return arg;}
   int32_t echoI32(int32_t arg) {return arg;}
   int64_t echoI64(int64_t arg) {return arg;}
-  string echoString(string arg) {return arg;}
+  string echoString(string arg) {
+    if (arg != "hello") {
+      T_ERROR_ABORT("WRONG STRING!!!!");
+    }
+    return arg;
+  }
   vector<int8_t> echoList(vector<int8_t> arg) {return arg;}
   set<int8_t> echoSet(set<int8_t> arg) {return arg;}
   map<int8_t, int8_t> echoMap(map<int8_t, int8_t> arg) {return arg;}
@@ -189,20 +195,22 @@
   bool _done;
   Monitor _sleep;
 };
-    
+
+
 int main(int argc, char **argv) {
 
-  int port = 9090;
+  int port = 9091;
   string serverType = "thread-pool";
   string protocolType = "binary";
   size_t workerCount = 4;
-  size_t clientCount = 10;
-  size_t loopCount = 10000;
+  size_t clientCount = 20;
+  size_t loopCount = 50000;
   TType loopType  = T_VOID;
   string callName = "echoVoid";
   bool runServer = true;
   bool logRequests = false;
   string requestLogPath = "./requestlog.tlog";
+  bool replayRequests = false;
 
   ostringstream usage;
 
@@ -217,8 +225,10 @@
     "\tserver-type    Type of server, \"simple\" or \"thread-pool\".  Default is " << serverType << endl <<
     "\tprotocol-type  Type of protocol, \"binary\", \"ascii\", or \"xml\".  Default is " << protocolType << endl <<
     "\tlog-request    Log all request to ./requestlog.tlog. Default is " << logRequests << endl <<
+    "\treplay-request Replay requests from log file (./requestlog.tlog) Default is " << replayRequests << endl <<
     "\tworkers        Number of thread pools workers.  Only valid for thread-pool server type.  Default is " << workerCount << endl;
     
+        
   map<string, string>  args;
   
   for(int ix = 1; ix < argc; ix++) {
@@ -272,6 +282,10 @@
       logRequests = args["log-request"] == "true";
     }
 
+    if(!args["replay-request"].empty()) {
+      replayRequests = args["replay-request"] == "true";
+    }
+
     if(!args["server-type"].empty()) {
       serverType = args["server-type"];
       
@@ -299,6 +313,28 @@
   // Dispatcher
   shared_ptr<Server> serviceHandler(new Server());
 
+  if (replayRequests) {
+    shared_ptr<Server> serviceHandler(new Server());
+    shared_ptr<ServiceProcessor> serviceProcessor(new ServiceProcessor(serviceHandler));
+  
+    // Transports
+    shared_ptr<TFileTransport> fileTransport(new TFileTransport(requestLogPath));
+    fileTransport->setChunkSize(2 * 1024 * 1024);
+    fileTransport->setMaxEventSize(1024 * 16);
+    fileTransport->seekToEnd();
+
+    // Protocol Factory
+    shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
+
+    TFileProcessor fileProcessor(serviceProcessor,
+                                 protocolFactory,
+                                 fileTransport);
+
+    fileProcessor.process(0, true);                                     
+    exit(0);
+  }
+
+
   if(runServer) {
 
     shared_ptr<ServiceProcessor> serviceProcessor(new ServiceProcessor(serviceHandler));
@@ -314,11 +350,12 @@
 
     if (logRequests) {
       // initialize the log file
-      shared_ptr<TBufferedFileWriter> bufferedFileWriter(new TBufferedFileWriter(requestLogPath, 1000));
-      bufferedFileWriter->setChunkSize(2 * 1024 * 1024);
-      bufferedFileWriter->setMaxEventSize(1024 * 16);
+      shared_ptr<TFileTransport> fileTransport(new TFileTransport(requestLogPath));
+      fileTransport->setChunkSize(2 * 1024 * 1024);
+      fileTransport->setMaxEventSize(1024 * 16);
       
-      transportFactory = shared_ptr<TTransportFactory>(new TBufferedRouterTransportFactory(bufferedFileWriter));
+      transportFactory = 
+        shared_ptr<TTransportFactory>(new TBufferedRouterTransportFactory(fileTransport));
     }
 
     shared_ptr<Thread> serverThread;