THRIFT-929. cpp: Add unit test for transport functionality

Tests various transport types by writing data, and verifying it can be
read back successfully.  Tests both virtual calls (accessed as
TTransport*) and non-virtual calls (accessed as the appropriate pointer
type, or as TBufferBase* when appropriate).

This is similar to some of the tests already performed in
TMemoryBufferTest and ZlibTest.cpp.  However, this tests a few more
transport types, and it interleaves read and write calls more heavily.
(It currently exposes some bugs in flush() in a couple transports.) It
also exercises both the virtual and non-virtual code paths, now that
read() and write() only result in virtual calls when invoked on a
TTransport*.

TFileTransport currently has several race condition bugs, so most of the
TFileTransport tests ususally fail.  It also has some performance bugs,
causing the TFileTransport tests to take a long time.  Will fix those
issues separately.

git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@1005146 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/test/Makefile.am b/lib/cpp/test/Makefile.am
index 536796f..e74c089 100644
--- a/lib/cpp/test/Makefile.am
+++ b/lib/cpp/test/Makefile.am
@@ -49,6 +49,7 @@
 	OptionalRequiredTest \
 	SpecializationTest \
 	AllProtocolsTest \
+	TransportTest \
 	UnitTests
 
 TESTS = \
@@ -61,6 +62,11 @@
 
 UnitTests_LDADD = $(BOOST_LDFLAGS) libtestgencpp.la -lboost_unit_test_framework
 
+TransportTest_SOURCES = \
+	TransportTest.cpp
+
+TransportTest_LDADD = libtestgencpp.la $(top_builddir)/lib/cpp/libthriftz.la -l:libboost_unit_test_framework.a -lz
+
 #
 # TFDTransportTest
 #
diff --git a/lib/cpp/test/TransportTest.cpp b/lib/cpp/test/TransportTest.cpp
new file mode 100644
index 0000000..e5ddeee
--- /dev/null
+++ b/lib/cpp/test/TransportTest.cpp
@@ -0,0 +1,587 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef _GNU_SOURCE
+#define _GNU_SOURCE // needed for getopt_long
+#endif
+
+#include <stdlib.h>
+#include <time.h>
+#include <unistd.h>
+#include <getopt.h>
+#include <sstream>
+#include <tr1/functional>
+
+#include <boost/mpl/list.hpp>
+#include <boost/shared_array.hpp>
+#include <boost/random.hpp>
+#include <boost/type_traits.hpp>
+#include <boost/test/unit_test.hpp>
+
+#include <transport/TBufferTransports.h>
+#include <transport/TFDTransport.h>
+#include <transport/TFileTransport.h>
+
+using namespace apache::thrift::transport;
+
+static boost::mt19937 rng;
+static const char* tmp_dir = "/tmp";
+
+void initrand(const int* seedptr) {
+  unsigned int seed;
+  if (seedptr) {
+    seed = *seedptr;
+  } else {
+    seed = static_cast<unsigned int>(time(NULL));
+  }
+
+  rng.seed(seed);
+}
+
+class SizeGenerator {
+ public:
+  virtual ~SizeGenerator() {}
+  virtual uint32_t nextSize() = 0;
+  virtual std::string describe() const = 0;
+};
+
+class ConstantSizeGenerator : public SizeGenerator {
+ public:
+  ConstantSizeGenerator(uint32_t value) : value_(value) {}
+  uint32_t nextSize() { return value_; }
+  std::string describe() const {
+    std::ostringstream desc;
+    desc << value_;
+    return desc.str();
+  }
+
+ private:
+  uint32_t value_;
+};
+
+class RandomSizeGenerator : public SizeGenerator {
+ public:
+  RandomSizeGenerator(uint32_t min, uint32_t max) :
+    generator_(rng, boost::uniform_int<int>(min, max)) {}
+
+  uint32_t nextSize() { return generator_(); }
+
+  std::string describe() const {
+    std::ostringstream desc;
+    desc << "rand(" << getMin() << ", " << getMax() << ")";
+    return desc.str();
+  }
+
+  uint32_t getMin() const { return generator_.distribution().min(); }
+  uint32_t getMax() const { return generator_.distribution().max(); }
+
+ private:
+  boost::variate_generator< boost::mt19937&, boost::uniform_int<int> >
+    generator_;
+};
+
+/**
+ * This class exists solely to make the TEST_RW() macro easier to use.
+ * - it can be constructed implicitly from an integer
+ * - it can contain either a ConstantSizeGenerator or a RandomSizeGenerator
+ *   (TEST_RW can't take a SizeGenerator pointer or reference, since it needs
+ *   to make a copy of the generator to bind it to the test function.)
+ */
+class GenericSizeGenerator : public SizeGenerator {
+ public:
+  GenericSizeGenerator(uint32_t value) :
+    generator_(new ConstantSizeGenerator(value)) {}
+  GenericSizeGenerator(uint32_t min, uint32_t max) :
+    generator_(new RandomSizeGenerator(min, max)) {}
+
+  uint32_t nextSize() { return generator_->nextSize(); }
+  std::string describe() const { return generator_->describe(); }
+
+ private:
+  boost::shared_ptr<SizeGenerator> generator_;
+};
+
+/**************************************************************************
+ * Classes to set up coupled transports
+ **************************************************************************/
+
+template <class Transport_>
+class CoupledTransports {
+ public:
+  typedef Transport_ TransportType;
+
+  CoupledTransports() : in(NULL), out(NULL) {}
+
+  Transport_* in;
+  Transport_* out;
+
+ private:
+  CoupledTransports(const CoupledTransports&);
+  CoupledTransports &operator=(const CoupledTransports&);
+};
+
+class CoupledMemoryBuffers : public CoupledTransports<TMemoryBuffer> {
+ public:
+  CoupledMemoryBuffers() {
+    in = &buf;
+    out = &buf;
+  }
+
+  TMemoryBuffer buf;
+};
+
+class CoupledBufferedTransports :
+  public CoupledTransports<TBufferedTransport> {
+ public:
+  CoupledBufferedTransports() :
+    buf(new TMemoryBuffer) {
+    in = new TBufferedTransport(buf);
+    out = new TBufferedTransport(buf);
+  }
+
+  ~CoupledBufferedTransports() {
+    delete in;
+    delete out;
+  }
+
+  boost::shared_ptr<TMemoryBuffer> buf;
+};
+
+class CoupledFramedTransports : public CoupledTransports<TFramedTransport> {
+ public:
+  CoupledFramedTransports() :
+    buf(new TMemoryBuffer) {
+    in = new TFramedTransport(buf);
+    out = new TFramedTransport(buf);
+  }
+
+  ~CoupledFramedTransports() {
+    delete in;
+    delete out;
+  }
+
+  boost::shared_ptr<TMemoryBuffer> buf;
+};
+
+class CoupledFDTransports : public CoupledTransports<TFDTransport> {
+ public:
+  CoupledFDTransports() {
+    int pipes[2];
+
+    if (pipe(pipes) != 0) {
+      return;
+    }
+
+    in = new TFDTransport(pipes[0], TFDTransport::CLOSE_ON_DESTROY);
+    out = new TFDTransport(pipes[1], TFDTransport::CLOSE_ON_DESTROY);
+  }
+
+  ~CoupledFDTransports() {
+    delete in;
+    delete out;
+  }
+};
+
+class CoupledFileTransports : public CoupledTransports<TFileTransport> {
+ public:
+  CoupledFileTransports() {
+    // Create a temporary file to use
+    size_t filename_len = strlen(tmp_dir) + 32;
+    filename = new char[filename_len];
+    snprintf(filename, filename_len,
+             "%s/thrift.transport_test.XXXXXX", tmp_dir);
+    fd = mkstemp(filename);
+    if (fd < 0) {
+      return;
+    }
+
+    in = new TFileTransport(filename, true);
+    out = new TFileTransport(filename);
+  }
+
+  ~CoupledFileTransports() {
+    delete in;
+    delete out;
+
+    if (fd >= 0) {
+      close(fd);
+      unlink(filename);
+    }
+    delete[] filename;
+  }
+
+  char* filename;
+  int fd;
+};
+
+template <class CoupledTransports_>
+class CoupledTTransports : public CoupledTransports<TTransport> {
+ public:
+  CoupledTTransports() : transports() {
+    in = transports.in;
+    out = transports.out;
+  }
+
+  CoupledTransports_ transports;
+};
+
+template <class CoupledTransports_>
+class CoupledBufferBases : public CoupledTransports<TBufferBase> {
+ public:
+  CoupledBufferBases() : transports() {
+    in = transports.in;
+    out = transports.out;
+  }
+
+  CoupledTransports_ transports;
+};
+
+/*
+ * TODO: It would be nice to test TSocket, too.
+ * Unfortunately, TSocket/TServerSocket currently don't provide a low-level
+ * API that would allow us to create a connected socket pair.
+ *
+ * TODO: It would be nice to test TZlibTransport, too.
+ * However, TZlibTransport doesn't conform to quite the same semantics as other
+ * transports.  No new data can be written to a TZlibTransport after flush() is
+ * called, since flush() terminates the zlib data stream.  In the future maybe
+ * we should make TZlibTransport behave more like the other transports.
+ */
+
+/**************************************************************************
+ * Main testing function
+ **************************************************************************/
+
+/**
+ * Test interleaved write and read calls.
+ *
+ * Generates a buffer totalSize bytes long, then writes it to the transport,
+ * and verifies the written data can be read back correctly.
+ *
+ * Mode of operation:
+ * - call wChunkGenerator to figure out how large of a chunk to write
+ *   - call wSizeGenerator to get the size for individual write() calls,
+ *     and do this repeatedly until the entire chunk is written.
+ * - call rChunkGenerator to figure out how large of a chunk to read
+ *   - call rSizeGenerator to get the size for individual read() calls,
+ *     and do this repeatedly until the entire chunk is read.
+ * - repeat until the full buffer is written and read back,
+ *   then compare the data read back against the original buffer
+ *
+ *
+ * - If any of the size generators return 0, this means to use the maximum
+ *   possible size.
+ *
+ * - If maxOutstanding is non-zero, write chunk sizes will be chosen such that
+ *   there are never more than maxOutstanding bytes waiting to be read back.
+ */
+template <class CoupledTransports>
+void test_rw(uint32_t totalSize,
+             SizeGenerator& wSizeGenerator,
+             SizeGenerator& rSizeGenerator,
+             SizeGenerator& wChunkGenerator,
+             SizeGenerator& rChunkGenerator,
+             uint32_t maxOutstanding) {
+  CoupledTransports transports;
+  BOOST_REQUIRE(transports.in != NULL);
+  BOOST_REQUIRE(transports.out != NULL);
+
+  boost::shared_array<uint8_t> wbuf =
+    boost::shared_array<uint8_t>(new uint8_t[totalSize]);
+  boost::shared_array<uint8_t> rbuf =
+    boost::shared_array<uint8_t>(new uint8_t[totalSize]);
+
+  // store some data in wbuf
+  for (uint32_t n = 0; n < totalSize; ++n) {
+    wbuf[n] = (n & 0xff);
+  }
+  // clear rbuf
+  memset(rbuf.get(), 0, totalSize);
+
+  uint32_t total_written = 0;
+  uint32_t total_read = 0;
+  while (total_read < totalSize) {
+    // Determine how large a chunk of data to write
+    uint32_t wchunk_size = wChunkGenerator.nextSize();
+    if (wchunk_size == 0 || wchunk_size > totalSize - total_written) {
+      wchunk_size = totalSize - total_written;
+    }
+
+    // Make sure (total_written - total_read) + wchunk_size
+    // is less than maxOutstanding
+    if (maxOutstanding > 0 &&
+        wchunk_size > maxOutstanding - (total_written - total_read)) {
+      wchunk_size = maxOutstanding - (total_written - total_read);
+    }
+
+    // Write the chunk
+    uint32_t chunk_written = 0;
+    while (chunk_written < wchunk_size) {
+      uint32_t write_size = wSizeGenerator.nextSize();
+      if (write_size == 0 || write_size > wchunk_size - chunk_written) {
+        write_size = wchunk_size - chunk_written;
+      }
+
+      transports.out->write(wbuf.get() + total_written, write_size);
+      chunk_written += write_size;
+      total_written += write_size;
+    }
+
+    // Flush the data, so it will be available in the read transport
+    // Don't flush if wchunk_size is 0.  (This should only happen if
+    // total_written == totalSize already, and we're only reading now.)
+    if (wchunk_size > 0) {
+      transports.out->flush();
+    }
+
+    // Determine how large a chunk of data to read back
+    uint32_t rchunk_size = rChunkGenerator.nextSize();
+    if (rchunk_size == 0 || rchunk_size > total_written - total_read) {
+      rchunk_size = total_written - total_read;
+    }
+
+    // Read the chunk
+    uint32_t chunk_read = 0;
+    while (chunk_read < rchunk_size) {
+      uint32_t read_size = rSizeGenerator.nextSize();
+      if (read_size == 0 || read_size > rchunk_size - chunk_read) {
+        read_size = rchunk_size - chunk_read;
+      }
+
+      int bytes_read = transports.in->read(rbuf.get() + total_read, read_size);
+      BOOST_REQUIRE_MESSAGE(bytes_read > 0,
+                            "read(pos=" << total_read << ", size=" <<
+                            read_size << ") returned " << bytes_read <<
+                            "; written so far: " << total_written << " / " <<
+                            totalSize << " bytes");
+      chunk_read += bytes_read;
+      total_read += bytes_read;
+    }
+  }
+
+  // make sure the data read back is identical to the data written
+  BOOST_CHECK_EQUAL(memcmp(rbuf.get(), wbuf.get(), totalSize), 0);
+}
+
+/**************************************************************************
+ * Test case generation
+ *
+ * Pretty ugly and annoying.  This would be much easier if we the unit test
+ * framework didn't force each test to be a separate function.
+ * - Writing a completely separate function definition for each of these would
+ *   result in a lot of repetitive boilerplate code.
+ * - Combining many tests into a single function makes it more difficult to
+ *   tell precisely which tests failed.  It also means you can't get a progress
+ *   update after each test, and the tests are already fairly slow.
+ * - Similar registration could be acheived with BOOST_TEST_CASE_TEMPLATE,
+ *   but it requires a lot of awkward MPL code, and results in useless test
+ *   case names.  (The names are generated from std::type_info::name(), which
+ *   is compiler-dependent.  gcc returns mangled names.)
+ **************************************************************************/
+
+#define TEST_RW(CoupledTransports, totalSize, ...) \
+  do { \
+    /* Add the test as specified, to test the non-virtual function calls */ \
+    addTest<CoupledTransports>(BOOST_STRINGIZE(CoupledTransports), \
+                               totalSize, ## __VA_ARGS__); \
+    /* \
+     * Also test using the transport as a TTransport*, to test \
+     * the read_virt()/write_virt() calls \
+     */ \
+    addTest< CoupledTTransports<CoupledTransports> >( \
+        BOOST_STRINGIZE(CoupledTTransports<CoupledTransports>), \
+                        totalSize, ## __VA_ARGS__); \
+  } while (0)
+
+#define TEST_RW_BUF(CoupledTransports, totalSize, ...) \
+  do { \
+    /* Add the standard tests */ \
+    TEST_RW(CoupledTransports, totalSize, ## __VA_ARGS__); \
+    /* Also test using the transport as a TBufferBase* */ \
+    addTest< CoupledBufferBases<CoupledTransports> >( \
+        BOOST_STRINGIZE(CoupledBufferBases<CoupledTransports>), \
+                        totalSize, ## __VA_ARGS__); \
+  } while (0)
+
+// We use the same tests for all of the buffered transports
+// This is a helper macro so we don't have to copy-and-paste them.
+#define BUFFER_TESTS(CoupledTransports) \
+  TEST_RW_BUF(CoupledTransports, 1024*1024*30, 0, 0); \
+  TEST_RW_BUF(CoupledTransports, 1024*1024*10, rand4k, rand4k); \
+  TEST_RW_BUF(CoupledTransports, 1024*1024*10, 167, 163); \
+  TEST_RW_BUF(CoupledTransports, 1024*1024, 1, 1); \
+  \
+  TEST_RW_BUF(CoupledTransports, 1024*1024*10, 0, 0, rand4k, rand4k); \
+  TEST_RW_BUF(CoupledTransports, 1024*1024*10, \
+              rand4k, rand4k, rand4k, rand4k); \
+  TEST_RW_BUF(CoupledTransports, 1024*1024*10, 167, 163, rand4k, rand4k); \
+  TEST_RW_BUF(CoupledTransports, 1024*1024*2, 1, 1, rand4k, rand4k);
+
+class TransportTestGen {
+ public:
+  TransportTestGen(boost::unit_test::test_suite* suite) : suite_(suite) {}
+
+  void generate() {
+    GenericSizeGenerator rand4k(1, 4096);
+
+    /*
+     * We do the basically the same set of tests for each transport type,
+     * although we tweak the parameters in some places.
+     */
+
+    // Buffered transport tests
+    BUFFER_TESTS(CoupledMemoryBuffers)
+    BUFFER_TESTS(CoupledBufferedTransports)
+    BUFFER_TESTS(CoupledFramedTransports)
+
+    // TFDTransport tests
+    // Since CoupledFDTransports tests with a pipe, writes will block
+    // if there is too much outstanding unread data in the pipe.
+    uint32_t fd_max_outstanding = 4096;
+    TEST_RW(CoupledFDTransports, 1024*1024*30, 0, 0,
+            0, 0, fd_max_outstanding);
+    TEST_RW(CoupledFDTransports, 1024*1024*10, rand4k, rand4k,
+            0, 0, fd_max_outstanding);
+    TEST_RW(CoupledFDTransports, 1024*1024*10, 167, 163,
+            0, 0, fd_max_outstanding);
+    TEST_RW(CoupledFDTransports, 1024*512, 1, 1,
+            0, 0, fd_max_outstanding);
+
+    TEST_RW(CoupledFDTransports, 1024*1024*10, 0, 0,
+            rand4k, rand4k, fd_max_outstanding);
+    TEST_RW(CoupledFDTransports, 1024*1024*10, rand4k, rand4k,
+            rand4k, rand4k, fd_max_outstanding);
+    TEST_RW(CoupledFDTransports, 1024*1024*10, 167, 163,
+            rand4k, rand4k, fd_max_outstanding);
+    TEST_RW(CoupledFDTransports, 1024*512, 1, 1,
+            rand4k, rand4k, fd_max_outstanding);
+
+    // TFileTransport tests
+    // We use smaller buffer sizes here, since TFileTransport is fairly slow.
+    //
+    // TFileTransport can't write more than 16MB at once
+    uint32_t max_write_at_once = 1024*1024*16 - 4;
+    TEST_RW(CoupledFileTransports, 1024*1024*30, max_write_at_once, 0);
+    TEST_RW(CoupledFileTransports, 1024*1024*5, rand4k, rand4k);
+    TEST_RW(CoupledFileTransports, 1024*1024*5, 167, 163);
+    TEST_RW(CoupledFileTransports, 1024*64, 1, 1);
+
+    TEST_RW(CoupledFileTransports, 1024*1024*2, 0, 0, rand4k, rand4k);
+    TEST_RW(CoupledFileTransports, 1024*1024*2,
+            rand4k, rand4k, rand4k, rand4k);
+    TEST_RW(CoupledFileTransports, 1024*1024*2, 167, 163, rand4k, rand4k);
+    TEST_RW(CoupledFileTransports, 1024*64, 1, 1, rand4k, rand4k);
+  }
+
+ private:
+  template <class CoupledTransports>
+  void addTest(const char* transport_name, uint32_t totalSize,
+               GenericSizeGenerator wSizeGen, GenericSizeGenerator rSizeGen,
+               GenericSizeGenerator wChunkSizeGen = 0,
+               GenericSizeGenerator rChunkSizeGen = 0,
+               uint32_t maxOutstanding = 0,
+               uint32_t expectedFailures = 0) {
+    std::ostringstream name;
+    name << transport_name << "::test_rw(" << totalSize << ", " <<
+      wSizeGen.describe() << ", " << rSizeGen.describe() << ", " <<
+      wChunkSizeGen.describe() << ", " << rChunkSizeGen.describe() << ", " <<
+      maxOutstanding << ")";
+
+    boost::unit_test::callback0<> test_func =
+      std::tr1::bind(test_rw<CoupledTransports>, totalSize,
+                     wSizeGen, rSizeGen, wChunkSizeGen, rChunkSizeGen,
+                     maxOutstanding);
+    boost::unit_test::test_case* tc =
+      boost::unit_test::make_test_case(test_func, name.str());
+    suite_->add(tc, expectedFailures);
+  };
+
+  boost::unit_test::test_suite* suite_;
+};
+
+/**************************************************************************
+ * General Initialization
+ **************************************************************************/
+
+void print_usage(FILE* f, const char* argv0) {
+  fprintf(f, "Usage: %s [boost_options] [options]\n", argv0);
+  fprintf(f, "Options:\n");
+  fprintf(f, "  --seed=<N>, -s <N>\n");
+  fprintf(f, "  --tmp-dir=DIR, -t DIR\n");
+  fprintf(f, "  --help\n");
+}
+
+void parse_args(int argc, char* argv[]) {
+  int seed;
+  int *seedptr = NULL;
+
+  struct option long_opts[] = {
+    { "help", false, NULL, 'h' },
+    { "seed", true, NULL, 's' },
+    { "tmp-dir", true, NULL, 't' },
+    { NULL, 0, NULL, 0 }
+  };
+
+  while (true) {
+    optopt = 1;
+    int optchar = getopt_long(argc, argv, "hs:t:", long_opts, NULL);
+    if (optchar == -1) {
+      break;
+    }
+
+    switch (optchar) {
+      case 't':
+        tmp_dir = optarg;
+        break;
+      case 's': {
+        char *endptr;
+        seed = strtol(optarg, &endptr, 0);
+        if (endptr == optarg || *endptr != '\0') {
+          fprintf(stderr, "invalid seed value \"%s\": must be an integer\n",
+                  optarg);
+          exit(1);
+        }
+        seedptr = &seed;
+        break;
+      }
+      case 'h':
+        print_usage(stdout, argv[0]);
+        exit(0);
+      case '?':
+        exit(1);
+      default:
+        // Only happens if someone adds another option to the optarg string,
+        // but doesn't update the switch statement to handle it.
+        fprintf(stderr, "unknown option \"-%c\"\n", optchar);
+        exit(1);
+    }
+  }
+
+  initrand(seedptr);
+}
+
+boost::unit_test::test_suite* init_unit_test_suite(int argc, char* argv[]) {
+  // Parse arguments
+  parse_args(argc, argv);
+
+  boost::unit_test::test_suite* suite = BOOST_TEST_SUITE("TransportTests");
+  TransportTestGen transport_test_generator(suite);
+  transport_test_generator.generate();
+
+  return suite;
+}