THRIFT-96. cpp: TSocket.peek fails on FreeBSD

Author: Alexander Shigin

git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@750585 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/transport/TBufferTransports.h b/lib/cpp/src/transport/TBufferTransports.h
index 9d9510d..4e3f187 100644
--- a/lib/cpp/src/transport/TBufferTransports.h
+++ b/lib/cpp/src/transport/TBufferTransports.h
@@ -156,50 +156,15 @@
 };
 
 
-/**
- * Buffered transport. For reads it will read more data than is requested
- * and will serve future data out of a local buffer. For writes, data is
- * stored to an in memory buffer before being written out.
- *
- * @author Mark Slee <mcslee@facebook.com>
- * @author David Reiss <dreiss@facebook.com>
+/** 
+ * Base class for all transport which wraps transport to new one.
  */
-class TBufferedTransport : public TBufferBase {
+class TUnderlyingTransport : public TBufferBase {
  public:
-
   static const int DEFAULT_BUFFER_SIZE = 512;
 
-  /// Use default buffer sizes.
-  TBufferedTransport(boost::shared_ptr<TTransport> transport)
-    : transport_(transport)
-    , rBufSize_(DEFAULT_BUFFER_SIZE)
-    , wBufSize_(DEFAULT_BUFFER_SIZE)
-    , rBuf_(new uint8_t[rBufSize_])
-    , wBuf_(new uint8_t[wBufSize_])
-  {
-    initPointers();
-  }
-
-  /// Use specified buffer sizes.
-  TBufferedTransport(boost::shared_ptr<TTransport> transport, uint32_t sz)
-    : transport_(transport)
-    , rBufSize_(sz)
-    , wBufSize_(sz)
-    , rBuf_(new uint8_t[rBufSize_])
-    , wBuf_(new uint8_t[wBufSize_])
-  {
-    initPointers();
-  }
-
-  /// Use specified read and write buffer sizes.
-  TBufferedTransport(boost::shared_ptr<TTransport> transport, uint32_t rsz, uint32_t wsz)
-    : transport_(transport)
-    , rBufSize_(rsz)
-    , wBufSize_(wsz)
-    , rBuf_(new uint8_t[rBufSize_])
-    , wBuf_(new uint8_t[wBufSize_])
-  {
-    initPointers();
+  virtual bool peek() {
+    return (rBase_ < rBound_) || transport_->peek();
   }
 
   void open() {
@@ -210,18 +175,84 @@
     return transport_->isOpen();
   }
 
-  bool peek() {
-    if (rBase_ == rBound_) {
-      setReadBuffer(rBuf_.get(), transport_->read(rBuf_.get(), rBufSize_));
-    }
-    return (rBound_ > rBase_);
-  }
-
   void close() {
     flush();
     transport_->close();
   }
 
+  boost::shared_ptr<TTransport> getUnderlyingTransport() {
+    return transport_;
+  }
+
+ protected:
+  boost::shared_ptr<TTransport> transport_;
+
+  uint32_t rBufSize_;
+  uint32_t wBufSize_;
+  boost::scoped_array<uint8_t> rBuf_;
+  boost::scoped_array<uint8_t> wBuf_;
+
+  TUnderlyingTransport(boost::shared_ptr<TTransport> transport, uint32_t sz)
+    : transport_(transport)
+    , rBufSize_(sz)
+    , wBufSize_(sz)
+    , rBuf_(new uint8_t[rBufSize_])
+    , wBuf_(new uint8_t[wBufSize_]) {}
+
+  TUnderlyingTransport(boost::shared_ptr<TTransport> transport)
+    : transport_(transport)
+    , rBufSize_(DEFAULT_BUFFER_SIZE)
+    , wBufSize_(DEFAULT_BUFFER_SIZE)
+    , rBuf_(new uint8_t[rBufSize_])
+    , wBuf_(new uint8_t[wBufSize_]) {}
+
+  TUnderlyingTransport(boost::shared_ptr<TTransport> transport, uint32_t rsz, uint32_t wsz)
+    : transport_(transport)
+    , rBufSize_(rsz)
+    , wBufSize_(wsz)
+    , rBuf_(new uint8_t[rBufSize_])
+    , wBuf_(new uint8_t[wBufSize_]) {}
+};
+
+/**
+ * Buffered transport. For reads it will read more data than is requested
+ * and will serve future data out of a local buffer. For writes, data is
+ * stored to an in memory buffer before being written out.
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ * @author David Reiss <dreiss@facebook.com>
+ */
+class TBufferedTransport : public TUnderlyingTransport {
+ public:
+
+  /// Use default buffer sizes.
+  TBufferedTransport(boost::shared_ptr<TTransport> transport)
+    : TUnderlyingTransport(transport)
+  {
+    initPointers();
+  }
+
+  /// Use specified buffer sizes.
+  TBufferedTransport(boost::shared_ptr<TTransport> transport, uint32_t sz)
+    : TUnderlyingTransport(transport, sz)
+  {
+    initPointers();
+  }
+
+  /// Use specified read and write buffer sizes.
+  TBufferedTransport(boost::shared_ptr<TTransport> transport, uint32_t rsz, uint32_t wsz)
+    : TUnderlyingTransport(transport, rsz, wsz)
+  {
+    initPointers();
+  }
+
+  virtual bool peek() {
+    /* shigin: see THRIFT-96 discussion */
+    if (rBase_ == rBound_) {
+      setReadBuffer(rBuf_.get(), transport_->read(rBuf_.get(), rBufSize_));
+    }
+    return (rBound_ > rBase_);
+  }
   virtual uint32_t readSlow(uint8_t* buf, uint32_t len);
 
   virtual void writeSlow(const uint8_t* buf, uint32_t len);
@@ -242,23 +273,12 @@
    */
   virtual const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len);
 
-  boost::shared_ptr<TTransport> getUnderlyingTransport() {
-    return transport_;
-  }
-
  protected:
   void initPointers() {
     setReadBuffer(rBuf_.get(), 0);
     setWriteBuffer(wBuf_.get(), wBufSize_);
     // Write size never changes.
   }
-
-  boost::shared_ptr<TTransport> transport_;
-
-  uint32_t rBufSize_;
-  uint32_t wBufSize_;
-  boost::scoped_array<uint8_t> rBuf_;
-  boost::scoped_array<uint8_t> wBuf_;
 };
 
 
@@ -292,49 +312,22 @@
  * @author Mark Slee <mcslee@facebook.com>
  * @author David Reiss <dreiss@facebook.com>
  */
-class TFramedTransport : public TBufferBase {
+class TFramedTransport : public TUnderlyingTransport {
  public:
 
-  static const int DEFAULT_BUFFER_SIZE = 512;
-
   /// Use default buffer sizes.
   TFramedTransport(boost::shared_ptr<TTransport> transport)
-    : transport_(transport)
-    , rBufSize_(0)
-    , wBufSize_(DEFAULT_BUFFER_SIZE)
-    , rBuf_()
-    , wBuf_(new uint8_t[wBufSize_])
+    : TUnderlyingTransport(transport)
   {
     initPointers();
   }
 
   TFramedTransport(boost::shared_ptr<TTransport> transport, uint32_t sz)
-    : transport_(transport)
-    , rBufSize_(0)
-    , wBufSize_(sz)
-    , rBuf_()
-    , wBuf_(new uint8_t[wBufSize_])
+    : TUnderlyingTransport(transport, sz)
   {
     initPointers();
   }
 
-  void open() {
-    transport_->open();
-  }
-
-  bool isOpen() {
-    return transport_->isOpen();
-  }
-
-  bool peek() {
-    return (rBase_ < rBound_) || transport_->peek();
-  }
-
-  void close() {
-    flush();
-    transport_->close();
-  }
-
   virtual uint32_t readSlow(uint8_t* buf, uint32_t len);
 
   virtual void writeSlow(const uint8_t* buf, uint32_t len);
@@ -343,10 +336,6 @@
 
   const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len);
 
-  boost::shared_ptr<TTransport> getUnderlyingTransport() {
-    return transport_;
-  }
-
  protected:
   /**
    * Reads a frame of input from the underlying stream.
@@ -361,13 +350,6 @@
     int32_t pad = 0;
     this->write((uint8_t*)&pad, sizeof(pad));
   }
-
-  boost::shared_ptr<TTransport> transport_;
-
-  uint32_t rBufSize_;
-  uint32_t wBufSize_;
-  boost::scoped_array<uint8_t> rBuf_;
-  boost::scoped_array<uint8_t> wBuf_;
 };
 
 /**
diff --git a/lib/cpp/src/transport/TSocket.cpp b/lib/cpp/src/transport/TSocket.cpp
index 04cd81e..1696053 100644
--- a/lib/cpp/src/transport/TSocket.cpp
+++ b/lib/cpp/src/transport/TSocket.cpp
@@ -96,6 +96,17 @@
   int r = recv(socket_, &buf, 1, MSG_PEEK);
   if (r == -1) {
     int errno_copy = errno;
+    #ifdef __FreeBSD__
+    /* shigin:
+     * freebsd returns -1 and ECONNRESET if socket was closed by 
+     * the other side
+     */
+    if (errno_copy == ECONNRESET)
+    {
+      close();
+      return false;
+    }
+    #endif
     GlobalOutput.perror("TSocket::peek() recv() " + getSocketInfo(), errno_copy);
     throw TTransportException(TTransportException::UNKNOWN, "recv()", errno_copy);
   }
@@ -284,6 +295,7 @@
   struct timeval begin;
   gettimeofday(&begin, NULL);
   int got = recv(socket_, buf, len, 0);
+  int errno_copy = errno; //gettimeofday can change errno
   struct timeval end;
   gettimeofday(&end, NULL);
   uint32_t readElapsedMicros =  (((end.tv_sec - begin.tv_sec) * 1000 * 1000)
@@ -292,7 +304,7 @@
 
   // Check for error on read
   if (got < 0) {
-    if (errno == EAGAIN) {
+    if (errno_copy == EAGAIN) {
       // check if this is the lack of resources or timeout case
       if (!eagainThresholdMicros || (readElapsedMicros < eagainThresholdMicros)) {
         if (retries++ < maxRecvRetries_) {
@@ -310,31 +322,37 @@
     }
 
     // If interrupted, try again
-    if (errno == EINTR && retries++ < maxRecvRetries_) {
+    if (errno_copy == EINTR && retries++ < maxRecvRetries_) {
       goto try_again;
     }
 
     // Now it's not a try again case, but a real probblez
-    int errno_copy = errno;  // Copy errno because we're allocating memory.
     GlobalOutput.perror("TSocket::read() recv() " + getSocketInfo(), errno_copy);
 
     // If we disconnect with no linger time
-    if (errno == ECONNRESET) {
+    if (errno_copy == ECONNRESET) {
+      #ifdef __FreeBSD__
+      /* shigin: freebsd doesn't follow POSIX semantic of recv and fails with
+       * ECONNRESET if peer performed shutdown 
+       */
+      close();
+      return 0;
+      #else
       throw TTransportException(TTransportException::NOT_OPEN, "ECONNRESET");
+      #endif
     }
 
     // This ish isn't open
-    if (errno == ENOTCONN) {
+    if (errno_copy == ENOTCONN) {
       throw TTransportException(TTransportException::NOT_OPEN, "ENOTCONN");
     }
 
     // Timed out!
-    if (errno == ETIMEDOUT) {
+    if (errno_copy == ETIMEDOUT) {
       throw TTransportException(TTransportException::TIMED_OUT, "ETIMEDOUT");
     }
 
     // Some other error, whatevz
-    errno_copy = errno;
     throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
   }