THRIFT-96. cpp: TSocket.peek fails on FreeBSD
Author: Alexander Shigin
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@750585 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/transport/TBufferTransports.h b/lib/cpp/src/transport/TBufferTransports.h
index 9d9510d..4e3f187 100644
--- a/lib/cpp/src/transport/TBufferTransports.h
+++ b/lib/cpp/src/transport/TBufferTransports.h
@@ -156,50 +156,15 @@
};
-/**
- * Buffered transport. For reads it will read more data than is requested
- * and will serve future data out of a local buffer. For writes, data is
- * stored to an in memory buffer before being written out.
- *
- * @author Mark Slee <mcslee@facebook.com>
- * @author David Reiss <dreiss@facebook.com>
+/**
+ * Base class for all transport which wraps transport to new one.
*/
-class TBufferedTransport : public TBufferBase {
+class TUnderlyingTransport : public TBufferBase {
public:
-
static const int DEFAULT_BUFFER_SIZE = 512;
- /// Use default buffer sizes.
- TBufferedTransport(boost::shared_ptr<TTransport> transport)
- : transport_(transport)
- , rBufSize_(DEFAULT_BUFFER_SIZE)
- , wBufSize_(DEFAULT_BUFFER_SIZE)
- , rBuf_(new uint8_t[rBufSize_])
- , wBuf_(new uint8_t[wBufSize_])
- {
- initPointers();
- }
-
- /// Use specified buffer sizes.
- TBufferedTransport(boost::shared_ptr<TTransport> transport, uint32_t sz)
- : transport_(transport)
- , rBufSize_(sz)
- , wBufSize_(sz)
- , rBuf_(new uint8_t[rBufSize_])
- , wBuf_(new uint8_t[wBufSize_])
- {
- initPointers();
- }
-
- /// Use specified read and write buffer sizes.
- TBufferedTransport(boost::shared_ptr<TTransport> transport, uint32_t rsz, uint32_t wsz)
- : transport_(transport)
- , rBufSize_(rsz)
- , wBufSize_(wsz)
- , rBuf_(new uint8_t[rBufSize_])
- , wBuf_(new uint8_t[wBufSize_])
- {
- initPointers();
+ virtual bool peek() {
+ return (rBase_ < rBound_) || transport_->peek();
}
void open() {
@@ -210,18 +175,84 @@
return transport_->isOpen();
}
- bool peek() {
- if (rBase_ == rBound_) {
- setReadBuffer(rBuf_.get(), transport_->read(rBuf_.get(), rBufSize_));
- }
- return (rBound_ > rBase_);
- }
-
void close() {
flush();
transport_->close();
}
+ boost::shared_ptr<TTransport> getUnderlyingTransport() {
+ return transport_;
+ }
+
+ protected:
+ boost::shared_ptr<TTransport> transport_;
+
+ uint32_t rBufSize_;
+ uint32_t wBufSize_;
+ boost::scoped_array<uint8_t> rBuf_;
+ boost::scoped_array<uint8_t> wBuf_;
+
+ TUnderlyingTransport(boost::shared_ptr<TTransport> transport, uint32_t sz)
+ : transport_(transport)
+ , rBufSize_(sz)
+ , wBufSize_(sz)
+ , rBuf_(new uint8_t[rBufSize_])
+ , wBuf_(new uint8_t[wBufSize_]) {}
+
+ TUnderlyingTransport(boost::shared_ptr<TTransport> transport)
+ : transport_(transport)
+ , rBufSize_(DEFAULT_BUFFER_SIZE)
+ , wBufSize_(DEFAULT_BUFFER_SIZE)
+ , rBuf_(new uint8_t[rBufSize_])
+ , wBuf_(new uint8_t[wBufSize_]) {}
+
+ TUnderlyingTransport(boost::shared_ptr<TTransport> transport, uint32_t rsz, uint32_t wsz)
+ : transport_(transport)
+ , rBufSize_(rsz)
+ , wBufSize_(wsz)
+ , rBuf_(new uint8_t[rBufSize_])
+ , wBuf_(new uint8_t[wBufSize_]) {}
+};
+
+/**
+ * Buffered transport. For reads it will read more data than is requested
+ * and will serve future data out of a local buffer. For writes, data is
+ * stored to an in memory buffer before being written out.
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ * @author David Reiss <dreiss@facebook.com>
+ */
+class TBufferedTransport : public TUnderlyingTransport {
+ public:
+
+ /// Use default buffer sizes.
+ TBufferedTransport(boost::shared_ptr<TTransport> transport)
+ : TUnderlyingTransport(transport)
+ {
+ initPointers();
+ }
+
+ /// Use specified buffer sizes.
+ TBufferedTransport(boost::shared_ptr<TTransport> transport, uint32_t sz)
+ : TUnderlyingTransport(transport, sz)
+ {
+ initPointers();
+ }
+
+ /// Use specified read and write buffer sizes.
+ TBufferedTransport(boost::shared_ptr<TTransport> transport, uint32_t rsz, uint32_t wsz)
+ : TUnderlyingTransport(transport, rsz, wsz)
+ {
+ initPointers();
+ }
+
+ virtual bool peek() {
+ /* shigin: see THRIFT-96 discussion */
+ if (rBase_ == rBound_) {
+ setReadBuffer(rBuf_.get(), transport_->read(rBuf_.get(), rBufSize_));
+ }
+ return (rBound_ > rBase_);
+ }
virtual uint32_t readSlow(uint8_t* buf, uint32_t len);
virtual void writeSlow(const uint8_t* buf, uint32_t len);
@@ -242,23 +273,12 @@
*/
virtual const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len);
- boost::shared_ptr<TTransport> getUnderlyingTransport() {
- return transport_;
- }
-
protected:
void initPointers() {
setReadBuffer(rBuf_.get(), 0);
setWriteBuffer(wBuf_.get(), wBufSize_);
// Write size never changes.
}
-
- boost::shared_ptr<TTransport> transport_;
-
- uint32_t rBufSize_;
- uint32_t wBufSize_;
- boost::scoped_array<uint8_t> rBuf_;
- boost::scoped_array<uint8_t> wBuf_;
};
@@ -292,49 +312,22 @@
* @author Mark Slee <mcslee@facebook.com>
* @author David Reiss <dreiss@facebook.com>
*/
-class TFramedTransport : public TBufferBase {
+class TFramedTransport : public TUnderlyingTransport {
public:
- static const int DEFAULT_BUFFER_SIZE = 512;
-
/// Use default buffer sizes.
TFramedTransport(boost::shared_ptr<TTransport> transport)
- : transport_(transport)
- , rBufSize_(0)
- , wBufSize_(DEFAULT_BUFFER_SIZE)
- , rBuf_()
- , wBuf_(new uint8_t[wBufSize_])
+ : TUnderlyingTransport(transport)
{
initPointers();
}
TFramedTransport(boost::shared_ptr<TTransport> transport, uint32_t sz)
- : transport_(transport)
- , rBufSize_(0)
- , wBufSize_(sz)
- , rBuf_()
- , wBuf_(new uint8_t[wBufSize_])
+ : TUnderlyingTransport(transport, sz)
{
initPointers();
}
- void open() {
- transport_->open();
- }
-
- bool isOpen() {
- return transport_->isOpen();
- }
-
- bool peek() {
- return (rBase_ < rBound_) || transport_->peek();
- }
-
- void close() {
- flush();
- transport_->close();
- }
-
virtual uint32_t readSlow(uint8_t* buf, uint32_t len);
virtual void writeSlow(const uint8_t* buf, uint32_t len);
@@ -343,10 +336,6 @@
const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len);
- boost::shared_ptr<TTransport> getUnderlyingTransport() {
- return transport_;
- }
-
protected:
/**
* Reads a frame of input from the underlying stream.
@@ -361,13 +350,6 @@
int32_t pad = 0;
this->write((uint8_t*)&pad, sizeof(pad));
}
-
- boost::shared_ptr<TTransport> transport_;
-
- uint32_t rBufSize_;
- uint32_t wBufSize_;
- boost::scoped_array<uint8_t> rBuf_;
- boost::scoped_array<uint8_t> wBuf_;
};
/**
diff --git a/lib/cpp/src/transport/TSocket.cpp b/lib/cpp/src/transport/TSocket.cpp
index 04cd81e..1696053 100644
--- a/lib/cpp/src/transport/TSocket.cpp
+++ b/lib/cpp/src/transport/TSocket.cpp
@@ -96,6 +96,17 @@
int r = recv(socket_, &buf, 1, MSG_PEEK);
if (r == -1) {
int errno_copy = errno;
+ #ifdef __FreeBSD__
+ /* shigin:
+ * freebsd returns -1 and ECONNRESET if socket was closed by
+ * the other side
+ */
+ if (errno_copy == ECONNRESET)
+ {
+ close();
+ return false;
+ }
+ #endif
GlobalOutput.perror("TSocket::peek() recv() " + getSocketInfo(), errno_copy);
throw TTransportException(TTransportException::UNKNOWN, "recv()", errno_copy);
}
@@ -284,6 +295,7 @@
struct timeval begin;
gettimeofday(&begin, NULL);
int got = recv(socket_, buf, len, 0);
+ int errno_copy = errno; //gettimeofday can change errno
struct timeval end;
gettimeofday(&end, NULL);
uint32_t readElapsedMicros = (((end.tv_sec - begin.tv_sec) * 1000 * 1000)
@@ -292,7 +304,7 @@
// Check for error on read
if (got < 0) {
- if (errno == EAGAIN) {
+ if (errno_copy == EAGAIN) {
// check if this is the lack of resources or timeout case
if (!eagainThresholdMicros || (readElapsedMicros < eagainThresholdMicros)) {
if (retries++ < maxRecvRetries_) {
@@ -310,31 +322,37 @@
}
// If interrupted, try again
- if (errno == EINTR && retries++ < maxRecvRetries_) {
+ if (errno_copy == EINTR && retries++ < maxRecvRetries_) {
goto try_again;
}
// Now it's not a try again case, but a real probblez
- int errno_copy = errno; // Copy errno because we're allocating memory.
GlobalOutput.perror("TSocket::read() recv() " + getSocketInfo(), errno_copy);
// If we disconnect with no linger time
- if (errno == ECONNRESET) {
+ if (errno_copy == ECONNRESET) {
+ #ifdef __FreeBSD__
+ /* shigin: freebsd doesn't follow POSIX semantic of recv and fails with
+ * ECONNRESET if peer performed shutdown
+ */
+ close();
+ return 0;
+ #else
throw TTransportException(TTransportException::NOT_OPEN, "ECONNRESET");
+ #endif
}
// This ish isn't open
- if (errno == ENOTCONN) {
+ if (errno_copy == ENOTCONN) {
throw TTransportException(TTransportException::NOT_OPEN, "ENOTCONN");
}
// Timed out!
- if (errno == ETIMEDOUT) {
+ if (errno_copy == ETIMEDOUT) {
throw TTransportException(TTransportException::TIMED_OUT, "ETIMEDOUT");
}
// Some other error, whatevz
- errno_copy = errno;
throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
}