THRIFT-929. cpp: Add tests to verify blocking read behavior

Add tests that check to see whether or not read() and borrow() block
when called with a length larger than the amount of data currently

At the moment, not all of the transports behave the same way.  I believe
the desired behavior is:

  When M bytes are available, and 0 < M < N:
  - read(N): return M bytes immediately
  - borrow(N): return NULL immediately

  When 0 bytes are available:
  - read(N): In this case, it is acceptable either to immediately return
    0, or to block until some data is available.  If the transport
    blocks, it returns immediately when some date becomes available,
    even if less than N bytes are available.
  - borrow(N): return NULL immediately

- The borrow() tests fail when using TBufferedTransport.
  TBufferedTransport incorrectly blocks until the amount of data
  requested is available.

- test_read_none_available() fails when using TFramedTransport.
  Calling read() on a TFramedTransport when no data is available throws
  an exception instead of returning 0.

- test_read_none_available() fails when using TFDTransport.  This is
  partly just an artifact of the fact that I use SIGALRM as part of this
  test.  Unlike TSocket, TFDTransport doesn't retry after EINTR.

- test_read_part_available() fails when using TZlibTransport around a
  transport that has blocking read() behavior.  TZlibTransport::read()
  loops calling read() on the underlying transport.  It should probably
  break out of the loop and return to the caller as soon as it has
  uncompressed any data, even if it is less than requested and more
  might be available.  Once some data has been uncompressed,
  TZlibTransport cannot risk calling read() again since it might block.

Will commit fixes for these separately.

git-svn-id: 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/test/TransportTest.cpp b/lib/cpp/test/TransportTest.cpp
index 49e9754..6eb4523 100644
--- a/lib/cpp/test/TransportTest.cpp
+++ b/lib/cpp/test/TransportTest.cpp
@@ -24,6 +24,7 @@
 #include <time.h>
 #include <unistd.h>
 #include <getopt.h>
+#include <signal.h>
 #include <sstream>
 #include <tr1/functional>
@@ -307,7 +308,134 @@
- * Main testing function
+ * Alarm handling code for use in tests that check the transport blocking
+ * semantics.
+ *
+ * If the transport ends up blocking, we don't want to hang forever.  We use
+ * SIGALRM to fire schedule signal to wake up and try to write data so the
+ * transport will unblock.
+ *
+ * It isn't really the safest thing in the world to be mucking around with
+ * complicated global data structures in a signal handler.  It should probably
+ * be okay though, since we know the main thread should always be blocked in a
+ * read() request when the signal handler is running.
+ **************************************************************************/
+struct TriggerInfo {
+  TriggerInfo(int seconds, const boost::shared_ptr<TTransport>& transport,
+              uint32_t writeLength) :
+    timeoutSeconds(seconds),
+    transport(transport),
+    writeLength(writeLength),
+    next(NULL) {}
+  int timeoutSeconds;
+  boost::shared_ptr<TTransport> transport;
+  uint32_t writeLength;
+  TriggerInfo* next;
+TriggerInfo* triggerInfo;
+unsigned int numTriggersFired;
+void set_alarm();
+void alarm_handler(int signum) {
+  // The alarm timed out, which almost certainly means we're stuck
+  // on a transport that is incorrectly blocked.
+  ++numTriggersFired;
+  // Note: we print messages to stdout instead of stderr, since
+  // tools/test/runner only records stdout messages in the failure messages for
+  // boost tests.  (boost prints its test info to stdout.)
+  printf("Timeout alarm expired; attempting to unblock transport\n");
+  if (triggerInfo == NULL) {
+    printf("  trigger stack is empty!\n");
+  }
+  // Pop off the first TriggerInfo.
+  // If there is another one, schedule an alarm for it.
+  TriggerInfo* info = triggerInfo;
+  triggerInfo = info->next;
+  set_alarm();
+  // Write some data to the transport to hopefully unblock it.
+  uint8_t buf[info->writeLength];
+  memset(buf, 'b', info->writeLength);
+  info->transport->write(buf, info->writeLength);
+  info->transport->flush();
+  delete info;
+void set_alarm() {
+  if (triggerInfo == NULL) {
+    // clear any alarm
+    alarm(0);
+    return;
+  }
+  struct sigaction action;
+  memset(&action, 0, sizeof(action));
+  action.sa_handler = alarm_handler;
+  action.sa_flags = SA_ONESHOT;
+  sigemptyset(&action.sa_mask);
+  sigaction(SIGALRM, &action, NULL);
+  alarm(triggerInfo->timeoutSeconds);
+ * Add a trigger to be scheduled "seconds" seconds after the
+ * last currently scheduled trigger.
+ *
+ * (Note that this is not "seconds" from now.  That might be more logical, but
+ * would require slightly more complicated sorting, rather than just appending
+ * to the end.)
+ */
+void add_trigger(unsigned int seconds,
+                 const boost::shared_ptr<TTransport> &transport,
+                 uint32_t write_len) {
+  TriggerInfo* info = new TriggerInfo(seconds, transport, write_len);
+  if (triggerInfo == NULL) {
+    // This is the first trigger.
+    // Set triggerInfo, and schedule the alarm
+    triggerInfo = info;
+    set_alarm();
+  } else {
+    // Add this trigger to the end of the list
+    TriggerInfo* prev = triggerInfo;
+    while (prev->next) {
+      prev = prev->next;
+    }
+    prev->next = info;
+  }
+void clear_triggers() {
+  TriggerInfo *info = triggerInfo;
+  alarm(0);
+  triggerInfo = NULL;
+  numTriggersFired = 0;
+  while (info != NULL) {
+    TriggerInfo* next = info->next;
+    delete info;
+    info = next;
+  }
+void set_trigger(unsigned int seconds,
+                 const boost::shared_ptr<TTransport> &transport,
+                 uint32_t write_len) {
+  clear_triggers();
+  add_trigger(seconds, transport, write_len);
+ * Test functions
@@ -430,6 +558,100 @@
   BOOST_CHECK_EQUAL(memcmp(rbuf.get(), wbuf.get(), totalSize), 0);
+template <class CoupledTransports>
+void test_read_part_available() {
+  CoupledTransports transports;
+  BOOST_REQUIRE(transports.out != NULL);
+  uint8_t write_buf[16];
+  uint8_t read_buf[16];
+  memset(write_buf, 'a', sizeof(write_buf));
+  // Attemping to read 10 bytes when only 9 are available should return 9
+  // immediately.
+  transports.out->write(write_buf, 9);
+  transports.out->flush();
+  set_trigger(3, transports.out, 1);
+  uint32_t bytes_read =>read(read_buf, 10);
+  BOOST_CHECK_EQUAL(numTriggersFired, 0);
+  BOOST_CHECK_EQUAL(bytes_read, 9);
+  clear_triggers();
+template <class CoupledTransports>
+void test_borrow_part_available() {
+  CoupledTransports transports;
+  BOOST_REQUIRE(transports.out != NULL);
+  uint8_t write_buf[16];
+  uint8_t read_buf[16];
+  memset(write_buf, 'a', sizeof(write_buf));
+  // Attemping to borrow 10 bytes when only 9 are available should return NULL
+  // immediately.
+  transports.out->write(write_buf, 9);
+  transports.out->flush();
+  set_trigger(3, transports.out, 1);
+  uint32_t borrow_len = 10;
+  const uint8_t* borrowed_buf =>borrow(read_buf, &borrow_len);
+  BOOST_CHECK_EQUAL(numTriggersFired, 0);
+  BOOST_CHECK(borrowed_buf == NULL);
+  clear_triggers();
+template <class CoupledTransports>
+void test_read_none_available() {
+  CoupledTransports transports;
+  BOOST_REQUIRE(transports.out != NULL);
+  uint8_t write_buf[16];
+  uint8_t read_buf[16];
+  memset(write_buf, 'a', sizeof(write_buf));
+  // Attempting to read when no data is available should either block until
+  // some data is available, or fail immediately.  (e.g., TSocket blocks,
+  // TMemoryBuffer just fails.)
+  //
+  // If the transport blocks, it should succeed once some data is available,
+  // even if less than the amount requested becomes available.
+  set_trigger(1, transports.out, 2);
+  add_trigger(1, transports.out, 8);
+  uint32_t bytes_read =>read(read_buf, 10);
+  if (bytes_read == 0) {
+    BOOST_CHECK_EQUAL(numTriggersFired, 0);
+    clear_triggers();
+  } else {
+    BOOST_CHECK_EQUAL(numTriggersFired, 1);
+    BOOST_CHECK_EQUAL(bytes_read, 2);
+  }
+  clear_triggers();
+template <class CoupledTransports>
+void test_borrow_none_available() {
+  CoupledTransports transports;
+  BOOST_REQUIRE(transports.out != NULL);
+  uint8_t write_buf[16];
+  memset(write_buf, 'a', sizeof(write_buf));
+  // Attempting to borrow when no data is available should fail immediately
+  set_trigger(1, transports.out, 10);
+  uint32_t borrow_len = 10;
+  const uint8_t* borrowed_buf =>borrow(NULL, &borrow_len);
+  BOOST_CHECK(borrowed_buf == NULL);
+  BOOST_CHECK_EQUAL(numTriggersFired, 0);
+  clear_triggers();
  * Test case generation
@@ -446,31 +668,41 @@
  *   is compiler-dependent.  gcc returns mangled names.)
-#define ADD_TEST(CoupledTransports, totalSize, ...) \
-    addTest< CoupledTransports >(BOOST_STRINGIZE(CoupledTransports), \
-                                 totalSize, ## __VA_ARGS__);
+#define ADD_TEST_RW(CoupledTransports, totalSize, ...) \
+    addTestRW< CoupledTransports >(BOOST_STRINGIZE(CoupledTransports), \
+                                   totalSize, ## __VA_ARGS__);
 #define TEST_RW(CoupledTransports, totalSize, ...) \
   do { \
     /* Add the test as specified, to test the non-virtual function calls */ \
-    ADD_TEST(CoupledTransports, totalSize, ## __VA_ARGS__); \
+    ADD_TEST_RW(CoupledTransports, totalSize, ## __VA_ARGS__); \
     /* \
      * Also test using the transport as a TTransport*, to test \
      * the read_virt()/write_virt() calls \
      */ \
-    ADD_TEST(CoupledTTransports<CoupledTransports>, \
-             totalSize, ## __VA_ARGS__); \
+    ADD_TEST_RW(CoupledTTransports<CoupledTransports>, \
+                totalSize, ## __VA_ARGS__); \
     /* Test wrapping the transport with TBufferedTransport */ \
-    ADD_TEST(CoupledBufferedTransportsT<CoupledTransports>, \
-        totalSize, ## __VA_ARGS__); \
+    ADD_TEST_RW(CoupledBufferedTransportsT<CoupledTransports>, \
+                totalSize, ## __VA_ARGS__); \
     /* Test wrapping the transport with TFramedTransports */ \
-    ADD_TEST(CoupledFramedTransportsT<CoupledTransports>, \
-             totalSize, ## __VA_ARGS__); \
+    ADD_TEST_RW(CoupledFramedTransportsT<CoupledTransports>, \
+                totalSize, ## __VA_ARGS__); \
     /* Test wrapping the transport with TZlibTransport */ \
-    ADD_TEST(CoupledZlibTransportsT<CoupledTransports>, \
-             totalSize, ## __VA_ARGS__); \
+    ADD_TEST_RW(CoupledZlibTransportsT<CoupledTransports>, \
+                totalSize, ## __VA_ARGS__); \
   } while (0)
+#define ADD_TEST_BLOCKING(CoupledTransports) \
+    addTestBlocking< CoupledTransports >(BOOST_STRINGIZE(CoupledTransports));
+#define TEST_BLOCKING_BEHAVIOR(CoupledTransports) \
+  ADD_TEST_BLOCKING(CoupledTransports); \
+  ADD_TEST_BLOCKING(CoupledTTransports<CoupledTransports>); \
+  ADD_TEST_BLOCKING(CoupledBufferedTransportsT<CoupledTransports>); \
+  ADD_TEST_BLOCKING(CoupledFramedTransportsT<CoupledTransports>); \
+  ADD_TEST_BLOCKING(CoupledZlibTransportsT<CoupledTransports>);
 class TransportTestGen {
   TransportTestGen(boost::unit_test::test_suite* suite,
@@ -497,6 +729,8 @@
     TEST_RW(CoupledMemoryBuffers, 1024*256, 167, 163, rand4k, rand4k);
     TEST_RW(CoupledMemoryBuffers, 1024*16, 1, 1, rand4k, rand4k);
+    TEST_BLOCKING_BEHAVIOR(CoupledMemoryBuffers);
     // TFDTransport tests
     // Since CoupledFDTransports tests with a pipe, writes will block
     // if there is too much outstanding unread data in the pipe.
@@ -519,6 +753,8 @@
     TEST_RW(CoupledFDTransports, 1024*16, 1, 1,
             rand4k, rand4k, fd_max_outstanding);
+    TEST_BLOCKING_BEHAVIOR(CoupledFDTransports);
     // TSocket tests
     uint32_t socket_max_outstanding = 4096;
     TEST_RW(CoupledSocketTransports, 1024*1024, 0, 0,
@@ -541,6 +777,8 @@
     TEST_RW(CoupledSocketTransports, 1024*16, 1, 1,
             rand4k, rand4k, 400);
+    TEST_BLOCKING_BEHAVIOR(CoupledSocketTransports);
     // TFileTransport tests
     // We use smaller buffer sizes here, since TFileTransport is fairly slow.
@@ -557,30 +795,32 @@
     TEST_RW(CoupledFileTransports, 1024*64, 167, 163, rand4k, rand4k);
     TEST_RW(CoupledFileTransports, 1024*2, 1, 1, rand4k, rand4k);
+    TEST_BLOCKING_BEHAVIOR(CoupledFileTransports);
     // Add some tests that access TBufferedTransport and TFramedTransport
     // via TTransport pointers and TBufferBase pointers.
-    ADD_TEST(CoupledTTransports<CoupledBufferedTransports>,
-             1024*1024, rand4k, rand4k, rand4k, rand4k);
-    ADD_TEST(CoupledBufferBases<CoupledBufferedTransports>,
-             1024*1024, rand4k, rand4k, rand4k, rand4k);
-    ADD_TEST(CoupledTTransports<CoupledFramedTransports>,
-             1024*1024, rand4k, rand4k, rand4k, rand4k);
-    ADD_TEST(CoupledBufferBases<CoupledFramedTransports>,
-             1024*1024, rand4k, rand4k, rand4k, rand4k);
+    ADD_TEST_RW(CoupledTTransports<CoupledBufferedTransports>,
+                1024*1024, rand4k, rand4k, rand4k, rand4k);
+    ADD_TEST_RW(CoupledBufferBases<CoupledBufferedTransports>,
+                1024*1024, rand4k, rand4k, rand4k, rand4k);
+    ADD_TEST_RW(CoupledTTransports<CoupledFramedTransports>,
+                1024*1024, rand4k, rand4k, rand4k, rand4k);
+    ADD_TEST_RW(CoupledBufferBases<CoupledFramedTransports>,
+                1024*1024, rand4k, rand4k, rand4k, rand4k);
     // Test using TZlibTransport via a TTransport pointer
-    ADD_TEST(CoupledTTransports<CoupledZlibTransports>,
-             1024*1024, rand4k, rand4k, rand4k, rand4k);
+    ADD_TEST_RW(CoupledTTransports<CoupledZlibTransports>,
+                1024*1024, rand4k, rand4k, rand4k, rand4k);
   template <class CoupledTransports>
-  void addTest(const char* transport_name, uint32_t totalSize,
-               GenericSizeGenerator wSizeGen, GenericSizeGenerator rSizeGen,
-               GenericSizeGenerator wChunkSizeGen = 0,
-               GenericSizeGenerator rChunkSizeGen = 0,
-               uint32_t maxOutstanding = 0,
-               uint32_t expectedFailures = 0) {
+  void addTestRW(const char* transport_name, uint32_t totalSize,
+                 GenericSizeGenerator wSizeGen, GenericSizeGenerator rSizeGen,
+                 GenericSizeGenerator wChunkSizeGen = 0,
+                 GenericSizeGenerator rChunkSizeGen = 0,
+                 uint32_t maxOutstanding = 0,
+                 uint32_t expectedFailures = 0) {
     // adjust totalSize by the specified sizeMultiplier_ first
     totalSize = static_cast<uint32_t>(totalSize * sizeMultiplier_);
@@ -599,6 +839,37 @@
     suite_->add(tc, expectedFailures);
+  template <class CoupledTransports>
+  void addTestBlocking(const char* transportName,
+                       uint32_t expectedFailures = 0) {
+    char name[1024];
+    boost::unit_test::test_case* tc;
+    snprintf(name, sizeof(name), "%s::test_read_part_available()",
+             transportName);
+    tc = boost::unit_test::make_test_case(
+          test_read_part_available<CoupledTransports>, name);
+    suite_->add(tc, expectedFailures);
+    snprintf(name, sizeof(name), "%s::test_read_none_available()",
+             transportName);
+    tc = boost::unit_test::make_test_case(
+          test_read_none_available<CoupledTransports>, name);
+    suite_->add(tc, expectedFailures);
+    snprintf(name, sizeof(name), "%s::test_borrow_part_available()",
+             transportName);
+    tc = boost::unit_test::make_test_case(
+          test_borrow_part_available<CoupledTransports>, name);
+    suite_->add(tc, expectedFailures);
+    snprintf(name, sizeof(name), "%s::test_borrow_none_available()",
+             transportName);
+    tc = boost::unit_test::make_test_case(
+          test_borrow_none_available<CoupledTransports>, name);
+    suite_->add(tc, expectedFailures);
+  }
   boost::unit_test::test_suite* suite_;
   // sizeMultiplier_ is configurable via the command line, and allows the
   // user to adjust between smaller buffers that can be tested quickly,