-- Error recovery code for thrift logfile

Summary:
- perform some basic corruption checks:
   1) Event larger than chunk
   2) Event larger than specified max
   3) Event crossing chunk boundary etc.
- If error encountered, then try to perform some recovery

Reviewed By: Slee

Test Plan: Going to test now...need to check in because of compile issues

Notes:
- These checks take care of the case when there is a dirty read from the filesystem (which
  we have encountered with the netapps). The recovery involves trying to perform the read
  again from ths FS and if that fails skipping the chunk altogether.
  Keep in mind that this might only be useful for idempotent systems (e.g. search redolog).


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664943 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/transport/TFileTransport.cpp b/lib/cpp/src/transport/TFileTransport.cpp
index e4510ec..4615977 100644
--- a/lib/cpp/src/transport/TFileTransport.cpp
+++ b/lib/cpp/src/transport/TFileTransport.cpp
@@ -424,7 +424,7 @@
   }
 
   while (1) {
-    // check if there is anything in the read buffer
+    // read from the file if read buffer is exhausted
     if (readState_.bufferPtr_ == readState_.bufferLen_) {
       // advance the offset pointer
       offset_ += readState_.bufferLen_;
@@ -480,7 +480,6 @@
 
         readState_.eventSizeBuff_[readState_.eventSizeBuffPos_++] = 
           readBuff_[readState_.bufferPtr_++];
-        bool eventCorruption = false;
         if (readState_.eventSizeBuffPos_ == 4) {
           // 0 length event indicates padding
           if (*((uint32_t *)(readState_.eventSizeBuff_)) == 0) {
@@ -496,18 +495,12 @@
           readState_.event_ = new eventInfo();
           readState_.event_->eventSize_ = *((uint32_t *)(readState_.eventSizeBuff_));
 
-          // TODO
-          // make sure event is valid, an error is triggered if:
-          // 1. Event size is larger than user-speficied max-event size
-
-          // 2. Event size is larger than chunk size
-
-          // 3. size indicates that event crosses chunk boundary
-
-        }
-
-        if (eventCorruption) {
-          // perform some kickass recovery 
+          // check if the event is corrupted and perform recovery if required
+          if (isEventCorrupted()) {
+            performRecovery();
+            // start from the top
+            break;
+          }
         }
       } else {
         if (!readState_.event_->eventBuff_) {
@@ -527,11 +520,6 @@
         readState_.event_->eventBuffPos_ += reclaimBuffer;
         readState_.bufferPtr_ += reclaimBuffer;
         
-        //         if (reclaimBuffer > 0) {
-        //           T_DEBUG_L(0, "eventBuffPost: %u", readState_.event_->eventBuffPos_);
-        //           T_DEBUG_L(0, "eventSize: %u", readState_.event_->eventSize_);
-        //         }
-
         // check if the event has been read in full
         if (readState_.event_->eventBuffPos_ == readState_.event_->eventSize_) {
           // set the completed event to the current event
@@ -542,16 +530,77 @@
           readState_.resetState(readState_.bufferPtr_);
           
           // exit criteria
-          //          T_DEBUG_L(0, "Finished one event");
           return true;
         }
       }
     }
-    
-    
+        
   }
 }
 
+bool TFileTransport::isEventCorrupted() {
+  // an error is triggered if:
+  if ( (maxEventSize_ > 0) &&  (readState_.event_->eventSize_ > maxEventSize_)) {
+    // 1. Event size is larger than user-speficied max-event size
+    T_ERROR("Read corrupt event. Event size(%u) greater than max event size (%u)",
+            readState_.event_->eventSize_, maxEventSize_);
+    return true;
+  } else if (readState_.event_->eventSize_ > chunkSize_) {
+    // 2. Event size is larger than chunk size
+    T_ERROR("Read corrupt event. Event size(%u) greater than chunk size (%u)",
+               readState_.event_->eventSize_, chunkSize_);            
+    return true;
+  } else if( ((offset_ + readState_.bufferPtr_ - 4)/chunkSize_) !=
+             ((offset_ + readState_.bufferPtr_ + readState_.event_->eventSize_ - 1)/chunkSize_) ) {
+    // 3. size indicates that event crosses chunk boundary
+    T_ERROR("Read corrupt event. Event crosses chunk boundary. Event size:%u  Offset:%ld",
+            readState_.event_->eventSize_, offset_ + readState_.bufferPtr_ + 4);
+    return true;
+  }
+
+  return false;
+}
+
+void TFileTransport::performRecovery() {
+  // perform some kickass recovery 
+  uint32_t curChunk = getCurChunk();
+  if (lastBadChunk == curChunk) {
+    numCorruptedEventsinChunk++;
+  } else {
+    lastBadChunk = curChunk;
+    numCorruptedEventsinChunk = 1;
+  }
+          
+  if (numCorruptedEventsinChunk < maxCorruptedEvents_) {
+    // maybe there was an error in reading the file from disk
+    // seek to the beginning of chunk and try again
+    seekToChunk(curChunk);
+  } else {
+
+    // just skip ahead to the next chunk if we not already at the last chunk
+    if (curChunk != (getNumChunks() - 1)) {
+      seekToChunk(curChunk + 1);
+    } else if (readTimeout_ == TAIL_READ_TIMEOUT) {
+      // if tailing the file, wait until there is enough data to start
+      // the next chunk
+      while(curChunk == (getNumChunks() - 1)) {
+        usleep(DEFAULT_CORRUPTED_SLEEP_TIME_US);
+      }
+      seekToChunk(curChunk + 1);
+    } else {
+      // pretty hosed at this stage, rewind the file back to the last successful 
+      // point and punt on the error 
+      readState_.resetState(readState_.lastDispatchPtr_);
+      char errorMsg[1024];
+      sprintf(errorMsg, "TFileTransport: log file corrupted at offset: %lu", 
+              offset_ + readState_.lastDispatchPtr_);
+      perror(errorMsg);
+      throw TTransportException(errorMsg);
+    }
+  }
+
+}
+
 void TFileTransport::seekToChunk(int32_t chunk) {
   if (fd_ <= 0) {
     throw TTransportException("File not open");
@@ -656,7 +705,22 @@
 TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
                                shared_ptr<TProtocolFactory> protocolFactory,
                                shared_ptr<TFileTransport> inputTransport):
-  processor_(processor), protocolFactory_(protocolFactory), 
+  processor_(processor), 
+  inputProtocolFactory_(protocolFactory), 
+  outputProtocolFactory_(protocolFactory), 
+  inputTransport_(inputTransport) {
+
+  // default the output transport to a null transport (common case)
+  outputTransport_ = shared_ptr<TNullTransport>(new TNullTransport());
+}
+
+TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
+                               shared_ptr<TProtocolFactory> inputProtocolFactory,
+                               shared_ptr<TProtocolFactory> outputProtocolFactory,
+                               shared_ptr<TFileTransport> inputTransport):
+  processor_(processor), 
+  inputProtocolFactory_(inputProtocolFactory), 
+  outputProtocolFactory_(outputProtocolFactory), 
   inputTransport_(inputTransport) {
 
   // default the output transport to a null transport (common case)
@@ -667,13 +731,15 @@
                                shared_ptr<TProtocolFactory> protocolFactory,
                                shared_ptr<TFileTransport> inputTransport,
                                shared_ptr<TTransport> outputTransport):
-  processor_(processor), protocolFactory_(protocolFactory), 
-  inputTransport_(inputTransport), outputTransport_(outputTransport) {
-};
+  processor_(processor), 
+  inputProtocolFactory_(protocolFactory), 
+  outputProtocolFactory_(protocolFactory), 
+  inputTransport_(inputTransport), 
+  outputTransport_(outputTransport) {};
 
 void TFileProcessor::process(uint32_t numEvents, bool tail) {
-  pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
-  iop = protocolFactory_->getIOProtocols(inputTransport_, outputTransport_);
+  shared_ptr<TProtocol> inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_);
+  shared_ptr<TProtocol> outputProtocol = outputProtocolFactory_->getProtocol(outputTransport_);
 
   // set the read timeout to 0 if tailing is required
   int32_t oldReadTimeout = inputTransport_->getReadTimeout();
@@ -687,7 +753,7 @@
     // bad form to use exceptions for flow control but there is really
     // no other way around it
     try {
-      processor_->process(iop.first, iop.second);
+      processor_->process(inputProtocol, outputProtocol);
       numProcessed++;
       if ( (numEvents > 0) && (numProcessed == numEvents)) {
         return;
@@ -710,8 +776,8 @@
 }
 
 void TFileProcessor::processChunk() {
-  pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
-  iop = protocolFactory_->getIOProtocols(inputTransport_, outputTransport_);
+  shared_ptr<TProtocol> inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_);
+  shared_ptr<TProtocol> outputProtocol = outputProtocolFactory_->getProtocol(outputTransport_);
 
   uint32_t curChunk = inputTransport_->getCurChunk();
 
@@ -719,7 +785,7 @@
     // bad form to use exceptions for flow control but there is really
     // no other way around it
     try {
-      processor_->process(iop.first, iop.second);
+      processor_->process(inputProtocol, outputProtocol);
       if (curChunk != inputTransport_->getCurChunk()) {
         break;
       }
diff --git a/lib/cpp/src/transport/TFileTransport.h b/lib/cpp/src/transport/TFileTransport.h
index fb4a1c2..a58be2c 100644
--- a/lib/cpp/src/transport/TFileTransport.h
+++ b/lib/cpp/src/transport/TFileTransport.h
@@ -119,6 +119,8 @@
     return readBuffSize_;
   }
 
+  static const int32_t TAIL_READ_TIMEOUT = -1;
+  static const int32_t NO_TAIL_READ_TIMEOUT = 0;
   void setReadTimeout(int32_t readTimeout) {
     readTimeout_ = readTimeout;
   }
@@ -205,6 +207,10 @@
   // helper functions for reading from a file
   bool readEvent();
 
+  // event corruption-related functions
+  bool isEventCorrupted();
+  void performRecovery();
+
   // Utility functions
   void openLogFile();
   uint32_t getCurrentTime();
@@ -252,6 +258,10 @@
   // sleep duration when EOF is hit
   uint32_t eofSleepTime_;
   static const uint32_t DEFAULT_EOF_SLEEP_TIME_US = 500 * 1000;
+
+  // sleep duration when a corrupted event is encountered
+  uint32_t corruptedEventSleepTime_;
+  static const uint32_t DEFAULT_CORRUPTED_SLEEP_TIME_US = 1 * 1000 * 1000;
     
   // writer thread id
   pthread_t writerThreadId_;
@@ -279,6 +289,10 @@
   // Offset within the file
   off_t offset_;
 
+  // event corruption information
+  uint32_t lastBadChunk;
+  uint32_t numCorruptedEventsinChunk;
+  
 };
 
 // Exception thrown when EOF is hit
@@ -303,6 +317,11 @@
                  shared_ptr<TProtocolFactory> protocolFactory,
                  shared_ptr<TFileTransport> inputTransport);
 
+  TFileProcessor(shared_ptr<TProcessor> processor,
+                 shared_ptr<TProtocolFactory> inputProtocolFactory,
+                 shared_ptr<TProtocolFactory> outputProtocolFactory,
+                 shared_ptr<TFileTransport> inputTransport);
+
   /** 
    * Constructor
    * 
@@ -314,7 +333,7 @@
   TFileProcessor(shared_ptr<TProcessor> processor,
                  shared_ptr<TProtocolFactory> protocolFactory,
                  shared_ptr<TFileTransport> inputTransport,
-                 shared_ptr<TTransport> outputTransport);                      
+                 shared_ptr<TTransport> outputTransport);
 
   /**
    * processes events from the file
@@ -332,7 +351,8 @@
   
  private:
   shared_ptr<TProcessor> processor_;
-  shared_ptr<TProtocolFactory> protocolFactory_;
+  shared_ptr<TProtocolFactory> inputProtocolFactory_;
+  shared_ptr<TProtocolFactory> outputProtocolFactory_;
   shared_ptr<TFileTransport> inputTransport_;
   shared_ptr<TTransport> outputTransport_;
 };