THRIFT-926. cpp: Fix bugs in TFileTransport::flush()

Previously flush() had race conditions that could cause it to return
before all data had actually been flushed to disk.  Now the writer
makes sure both buffer queues have been flushed when forceFlush_ is set.

Also, flush() did not wake up the writer thread, so it normally had to
wait for the writer thread to wake up on its own time.  (By default,
this could take up to 3 seconds.)

git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@1005156 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/transport/TFileTransport.cpp b/lib/cpp/src/transport/TFileTransport.cpp
index 50232cf..4deb1aa 100644
--- a/lib/cpp/src/transport/TFileTransport.cpp
+++ b/lib/cpp/src/transport/TFileTransport.cpp
@@ -204,10 +204,10 @@
     throw TTransportException("TFileTransport: attempting to write to file opened readonly");
   }
 
-  enqueueEvent(buf, len, false);
+  enqueueEvent(buf, len);
 }
 
-void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush) {
+void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen) {
   // can't enqueue more events if file is going to close
   if (closing_) {
     return;
@@ -249,6 +249,11 @@
     pthread_cond_wait(&notFull_, &mutex_);
   }
 
+  // We shouldn't be trying to enqueue new data while a forced flush is
+  // requested.  (Otherwise the writer thread might not ever be able to finish
+  // the flush if more data keeps being enqueued.)
+  assert(!forceFlush_);
+
   // add to the buffer
   if (!enqueueBuffer_->addEvent(toEnqueue)) {
     delete toEnqueue;
@@ -259,10 +264,6 @@
   // signal anybody who's waiting for the buffer to be non-empty
   pthread_cond_signal(&notEmpty_);
 
-  if (blockUntilFlush) {
-    pthread_cond_wait(&flushed_, &mutex_);
-  }
-
   // this really should be a loop where it makes sure it got flushed
   // because condition variables can get triggered by the os for no reason
   // it is probably a non-factor for the time being
@@ -449,28 +450,67 @@
       continue;
     }
 
-    bool flushTimeElapsed = false;
-    struct timespec current_time;
-    clock_gettime(CLOCK_REALTIME, &current_time);
+    // Local variable to cache the state of forceFlush_.
+    //
+    // We only want to check the value of forceFlush_ once each time around the
+    // loop.  If we check it more than once without holding the lock the entire
+    // time, it could have changed state in between.  This will result in us
+    // making inconsistent decisions.
+    bool forced_flush = false;
+    pthread_mutex_lock(&mutex_);
+    if (forceFlush_) {
+      if (!enqueueBuffer_->isEmpty()) {
+        // If forceFlush_ is true, we need to flush all available data.
+        // If enqueueBuffer_ is not empty, go back to the start of the loop to
+        // write it out.
+        //
+        // We know the main thread is waiting on forceFlush_ to be cleared,
+        // so no new events will be added to enqueueBuffer_ until we clear
+        // forceFlush_.  Therefore the next time around the loop enqueueBuffer_
+        // is guaranteed to be empty.  (I.e., we're guaranteed to make progress
+        // and clear forceFlush_ the next time around the loop.)
+        pthread_mutex_unlock(&mutex_);
+        continue;
+      }
+      forced_flush = true;
+    }
+    pthread_mutex_unlock(&mutex_);
 
-    if (current_time.tv_sec > ts_next_flush.tv_sec ||
-        (current_time.tv_sec == ts_next_flush.tv_sec && current_time.tv_nsec > ts_next_flush.tv_nsec)) {
-      flushTimeElapsed = true;
-      getNextFlushTime(&ts_next_flush);
+    // determine if we need to perform an fsync
+    bool flush = false;
+    if (forced_flush || unflushed > flushMaxBytes_) {
+      flush = true;
+    } else {
+      struct timespec current_time;
+      clock_gettime(CLOCK_REALTIME, &current_time);
+      if (current_time.tv_sec > ts_next_flush.tv_sec ||
+          (current_time.tv_sec == ts_next_flush.tv_sec &&
+           current_time.tv_nsec > ts_next_flush.tv_nsec)) {
+        if (unflushed > 0) {
+          flush = true;
+        } else {
+          // If there is no new data since the last fsync,
+          // don't perform the fsync, but do reset the timer.
+          getNextFlushTime(&ts_next_flush);
+        }
+      }
     }
 
-    // couple of cases from which a flush could be triggered
-    if ((flushTimeElapsed && unflushed > 0) ||
-       unflushed > flushMaxBytes_ ||
-       forceFlush_) {
-
+    if (flush) {
       // sync (force flush) file to disk
       fsync(fd_);
       unflushed = 0;
+      getNextFlushTime(&ts_next_flush);
 
       // notify anybody waiting for flush completion
-      forceFlush_ = false;
-      pthread_cond_broadcast(&flushed_);
+      if (forced_flush) {
+        pthread_mutex_lock(&mutex_);
+        forceFlush_ = false;
+        assert(enqueueBuffer_->isEmpty());
+        assert(dequeueBuffer_->isEmpty());
+        pthread_cond_broadcast(&flushed_);
+        pthread_mutex_unlock(&mutex_);
+      }
     }
   }
 }
@@ -483,7 +523,10 @@
   // wait for flush to take place
   pthread_mutex_lock(&mutex_);
 
+  // Indicate that we are requesting a flush
   forceFlush_ = true;
+  // Wake up the writer thread so it will perform the flush immediately
+  pthread_cond_signal(&notEmpty_);
 
   while (forceFlush_) {
     pthread_cond_wait(&flushed_, &mutex_);
diff --git a/lib/cpp/src/transport/TFileTransport.h b/lib/cpp/src/transport/TFileTransport.h
index f064b8e..2ea8c9a 100644
--- a/lib/cpp/src/transport/TFileTransport.h
+++ b/lib/cpp/src/transport/TFileTransport.h
@@ -290,7 +290,7 @@
 
  private:
   // helper functions for writing to a file
-  void enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush);
+  void enqueueEvent(const uint8_t* buf, uint32_t eventLen);
   bool swapEventBuffers(struct timespec* deadline);
   bool initBufferAndWriteThread();
 
diff --git a/lib/cpp/test/TFileTransportTest.cpp b/lib/cpp/test/TFileTransportTest.cpp
index a1827ec..4558344 100644
--- a/lib/cpp/test/TFileTransportTest.cpp
+++ b/lib/cpp/test/TFileTransportTest.cpp
@@ -45,14 +45,15 @@
 // Provide BOOST_CHECK_LT() and BOOST_CHECK_GT(), in case we're compiled
 // with an older version of boost
 #ifndef BOOST_CHECK_LT
-#define BOOST_CHECK_CMP(a, b, op) \
-  BOOST_CHECK_MESSAGE((a) op (b), \
-                      "check " BOOST_STRINGIZE(a) " " BOOST_STRINGIZE(op) " " \
-                      BOOST_STRINGIZE(b) " failed: " BOOST_STRINGIZE(a) "=" <<\
-                      (a) << " " BOOST_STRINGIZE(b) "=" << (b))
+#define BOOST_CHECK_CMP(a, b, op, check_fn) \
+  check_fn((a) op (b), \
+           "check " BOOST_STRINGIZE(a) " " BOOST_STRINGIZE(op) " " \
+           BOOST_STRINGIZE(b) " failed: " BOOST_STRINGIZE(a) "=" << (a) << \
+           " " BOOST_STRINGIZE(b) "=" << (b))
 
-#define BOOST_CHECK_LT(a, b) BOOST_CHECK_CMP(a, b, <)
-#define BOOST_CHECK_GT(a, b) BOOST_CHECK_CMP(a, b, >)
+#define BOOST_CHECK_LT(a, b) BOOST_CHECK_CMP(a, b, <, BOOST_CHECK_MESSAGE)
+#define BOOST_CHECK_GT(a, b) BOOST_CHECK_CMP(a, b, >, BOOST_CHECK_MESSAGE)
+#define BOOST_REQUIRE_LT(a, b) BOOST_CHECK_CMP(a, b, <, BOOST_REQUIRE_MESSAGE)
 #endif // BOOST_CHECK_LT
 
 /**
@@ -144,7 +145,6 @@
   return 0;
 }
 
-
 int time_diff(const struct timeval* t1, const struct timeval* t2) {
   return (t2->tv_usec - t1->tv_usec) + (t2->tv_sec - t1->tv_sec) * 1000000;
 }
@@ -301,6 +301,39 @@
   test_flush_max_us_impl(400000, 300000, 1000000);
 }
 
+/**
+ * Make sure flush() is fast when there is nothing to do.
+ *
+ * TFileTransport used to have a bug where flush() would wait for the fsync
+ * timeout to expire.
+ */
+void test_noop_flush() {
+  TempFile f(tmp_dir, "thrift.TFileTransportTest.");
+  TFileTransport transport(f.getPath());
+
+  // Write something to start the writer thread.
+  uint8_t buf[] = "a";
+  transport.write(buf, 1);
+
+  struct timeval start;
+  gettimeofday(&start, NULL);
+
+  for (unsigned int n = 0; n < 10; ++n) {
+    transport.flush();
+
+    struct timeval now;
+    gettimeofday(&now, NULL);
+
+    // Fail if at any point we've been running for longer than half a second.
+    // (With the buggy code, TFileTransport used to take 3 seconds per flush())
+    //
+    // Use a fatal fail so we break out early, rather than continuing to make
+    // many more slow flush() calls.
+    int delta = time_diff(&start, &now);
+    BOOST_REQUIRE_LT(delta, 500000);
+  }
+}
+
 /**************************************************************************
  * General Initialization
  **************************************************************************/
@@ -358,6 +391,7 @@
   suite->add(BOOST_TEST_CASE(test_flush_max_us1));
   suite->add(BOOST_TEST_CASE(test_flush_max_us2));
   suite->add(BOOST_TEST_CASE(test_flush_max_us3));
+  suite->add(BOOST_TEST_CASE(test_noop_flush));
 
   return suite;
 }
diff --git a/lib/cpp/test/TransportTest.cpp b/lib/cpp/test/TransportTest.cpp
index a932643..d6b40dd 100644
--- a/lib/cpp/test/TransportTest.cpp
+++ b/lib/cpp/test/TransportTest.cpp
@@ -450,13 +450,13 @@
   TEST_RW_BUF(CoupledTransports, 1024*1024*30, 0, 0); \
   TEST_RW_BUF(CoupledTransports, 1024*1024*10, rand4k, rand4k); \
   TEST_RW_BUF(CoupledTransports, 1024*1024*10, 167, 163); \
-  TEST_RW_BUF(CoupledTransports, 1024*1024, 1, 1); \
+  TEST_RW_BUF(CoupledTransports, 1024*512, 1, 1); \
   \
   TEST_RW_BUF(CoupledTransports, 1024*1024*10, 0, 0, rand4k, rand4k); \
   TEST_RW_BUF(CoupledTransports, 1024*1024*10, \
               rand4k, rand4k, rand4k, rand4k); \
   TEST_RW_BUF(CoupledTransports, 1024*1024*10, 167, 163, rand4k, rand4k); \
-  TEST_RW_BUF(CoupledTransports, 1024*1024*2, 1, 1, rand4k, rand4k);
+  TEST_RW_BUF(CoupledTransports, 1024*512, 1, 1, rand4k, rand4k);
 
 class TransportTestGen {
  public: