cpp: Prevent TFileTransport from throwing uncaught exceptions

FilleTransport::writerThread throws exceptions. This function is run
inside the writer thread, so these exceptions are not caught. When these
exceptions happen, the system aborts.

The fix is to first eliminate all the throw commands inside this function. In
addition, add some error recovery logic into the code: whenever an IO error
happens, we enter into an error recovery mode. Go to sleep for a few seconds
then try to reopen the file.

Note: Currently, when errors happen, we drop events.

git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@920686 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/transport/TFileTransport.cpp b/lib/cpp/src/transport/TFileTransport.cpp
index f67b9e3..ea9f41d 100644
--- a/lib/cpp/src/transport/TFileTransport.cpp
+++ b/lib/cpp/src/transport/TFileTransport.cpp
@@ -82,6 +82,7 @@
   , maxCorruptedEvents_(DEFAULT_MAX_CORRUPTED_EVENTS)
   , eofSleepTime_(DEFAULT_EOF_SLEEP_TIME_US)
   , corruptedEventSleepTime_(DEFAULT_CORRUPTED_SLEEP_TIME_US)
+  , writerThreadIOErrorSleepTime_(DEFAULT_WRITER_THREAD_SLEEP_TIME_US)
   , writerThreadId_(0)
   , dequeueBuffer_(NULL)
   , enqueueBuffer_(NULL)
@@ -301,52 +302,85 @@
 
 
 void TFileTransport::writerThread() {
+  bool hasIOError = false;
+
   // open file if it is not open
   if(!fd_) {
-    openLogFile();
+    try {
+      openLogFile();
+    } catch (...) {
+      int errno_copy = errno;
+      GlobalOutput.perror("TFileTransport: writerThread() openLogFile() ", errno_copy);
+      fd_ = 0;
+      hasIOError = true;
+    }
   }
 
   // set the offset to the correct value (EOF)
-  try {
-    seekToEnd();
-  } catch (TException &te) {
+  if (!hasIOError) {
+    try {
+      seekToEnd();
+      // throw away any partial events
+      offset_ += readState_.lastDispatchPtr_;
+      ftruncate(fd_, offset_);
+      readState_.resetAllValues();
+    } catch (...) {
+      int errno_copy = errno;
+      GlobalOutput.perror("TFileTransport: writerThread() initialization ", errno_copy);
+      hasIOError = true;
+    }
   }
 
-  // throw away any partial events
-  offset_ += readState_.lastDispatchPtr_;
-  ftruncate(fd_, offset_);
-  readState_.resetAllValues();
-
   // Figure out the next time by which a flush must take place
-
   struct timespec ts_next_flush;
   getNextFlushTime(&ts_next_flush);
   uint32_t unflushed = 0;
 
-  while(1) {
+  while (1) {
     // this will only be true when the destructor is being invoked
-    if(closing_) {
-      // empty out both the buffers
+    if (closing_) {
+      if (hasIOError) {
+        pthread_exit(NULL);
+      }
+
+      // Try to empty buffers before exit 
       if (enqueueBuffer_->isEmpty() && dequeueBuffer_->isEmpty()) {
+        fsync(fd_);
         if (-1 == ::close(fd_)) {
           int errno_copy = errno;
           GlobalOutput.perror("TFileTransport: writerThread() ::close() ", errno_copy);
-          throw TTransportException(TTransportException::UNKNOWN, "TFileTransport: error in file close", errno_copy);
         }
-        // just be safe and sync to disk
-        fsync(fd_);
-        fd_ = 0;
         pthread_exit(NULL);
-        return;
       }
     }
 
     if (swapEventBuffers(&ts_next_flush)) {
       eventInfo* outEvent;
       while (NULL != (outEvent = dequeueBuffer_->getNext())) {
-        if (!outEvent) {
-          T_DEBUG_L(1, "Got an empty event");
-          return;
+        // Remove an event from the buffer and write it out to disk. If there is any IO error, for instance,
+        // the output file is unmounted or deleted, then this event is dropped. However, the writer thread
+        // will: (1) sleep for a short while; (2) try to reopen the file; (3) if successful then start writing
+        // from the end.
+
+        while (hasIOError) {
+          T_ERROR("TFileTransport: writer thread going to sleep for %d microseconds due to IO errors", writerThreadIOErrorSleepTime_);
+          usleep(writerThreadIOErrorSleepTime_);
+          if (closing_) {
+            pthread_exit(NULL);
+          }
+          if (!fd_) {
+            ::close(fd_);
+            fd_ = 0;
+          }
+          try {
+            openLogFile();
+            seekToEnd();
+            unflushed = 0;
+            hasIOError = false;
+            T_LOG_OPER("TFileTransport: log file %s reopened by writer thread during error recovery", filename_.c_str());
+          } catch (...) {
+            T_ERROR("TFileTransport: unable to reopen log file %s during error recovery", filename_.c_str());
+          }
         }
 
         // sanity check on event
@@ -357,11 +391,9 @@
 
         // If chunking is required, then make sure that msg does not cross chunk boundary
         if ((outEvent->eventSize_ > 0) && (chunkSize_ != 0)) {
-
           // event size must be less than chunk size
-          if(outEvent->eventSize_ > chunkSize_) {
-            T_ERROR("TFileTransport: event size(%u) is greater than chunk size(%u): skipping event",
-                  outEvent->eventSize_, chunkSize_);
+          if (outEvent->eventSize_ > chunkSize_) {
+            T_ERROR("TFileTransport: event size(%u) > chunk size(%u): skipping event", outEvent->eventSize_, chunkSize_);
             continue;
           }
 
@@ -372,14 +404,15 @@
           if (chunk1 != chunk2) {
             // refetch the offset to keep in sync
             offset_ = lseek(fd_, 0, SEEK_CUR);
-            int32_t padding = (int32_t)((offset_/chunkSize_ + 1)*chunkSize_ - offset_);
+            int32_t padding = (int32_t)((offset_ / chunkSize_ + 1) * chunkSize_ - offset_);
 
             uint8_t zeros[padding];
             bzero(zeros, padding);
             if (-1 == ::write(fd_, zeros, padding)) {
               int errno_copy = errno;
               GlobalOutput.perror("TFileTransport: writerThread() error while padding zeros ", errno_copy);
-              throw TTransportException(TTransportException::UNKNOWN, "TFileTransport: error while padding zeros", errno_copy);
+              hasIOError = true;
+              continue;
             }
             unflushed += padding;
             offset_ += padding;
@@ -391,9 +424,9 @@
           if (-1 == ::write(fd_, outEvent->eventBuff_, outEvent->eventSize_)) {
             int errno_copy = errno;
             GlobalOutput.perror("TFileTransport: error while writing event ", errno_copy);
-            throw TTransportException(TTransportException::UNKNOWN, "TFileTransport: error while writing event", errno_copy);
+            hasIOError = true;
+            continue;
           }
-
           unflushed += outEvent->eventSize_;
           offset_ += outEvent->eventSize_;
         }
@@ -401,6 +434,10 @@
       dequeueBuffer_->reset();
     }
 
+    if (hasIOError) {
+      continue;
+    }
+
     bool flushTimeElapsed = false;
     struct timespec current_time;
     clock_gettime(CLOCK_REALTIME, &current_time);
diff --git a/lib/cpp/src/transport/TFileTransport.h b/lib/cpp/src/transport/TFileTransport.h
index b08c5c8..5117b6e 100644
--- a/lib/cpp/src/transport/TFileTransport.h
+++ b/lib/cpp/src/transport/TFileTransport.h
@@ -338,6 +338,10 @@
   uint32_t corruptedEventSleepTime_;
   static const uint32_t DEFAULT_CORRUPTED_SLEEP_TIME_US = 1 * 1000 * 1000;
 
+  // sleep duration in seconds when an IO error is encountered in the writer thread
+  uint32_t writerThreadIOErrorSleepTime_;
+  static const uint32_t DEFAULT_WRITER_THREAD_SLEEP_TIME_US = 60 * 1000 * 1000;
+
   // writer thread id
   pthread_t writerThreadId_;