-- fix read timeout handling in TSocket
Summary:
- turns out that EAGAIN can be returned both when there is a transmission timeout and when there
is a lack of system resources.
This diff has a hacky fix for respecting a user specified read timeout.
Reviewed By: Steve Grimm, Marc, Slee
Test Plan:
- Tested by trying to crash an srp machine
Revert Plan: No.
Notes:
- Also added functionality to allow users to specify the max number of recv retries (in the case
when EAGAIN is returned due to a lack of system resources)
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665121 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/Exception.h b/lib/cpp/src/concurrency/Exception.h
index 735cd87..575a3ed 100644
--- a/lib/cpp/src/concurrency/Exception.h
+++ b/lib/cpp/src/concurrency/Exception.h
@@ -20,9 +20,19 @@
class IllegalStateException : public facebook::thrift::TException {};
-class TimedOutException : public facebook::thrift::TException {};
+class TimedOutException : public facebook::thrift::TException {
+public:
+ TimedOutException():TException("TimedOutException"){};
+ TimedOutException(const std::string& message ) :
+ TException(message) {}
+};
-class TooManyPendingTasksException : public facebook::thrift::TException {};
+class TooManyPendingTasksException : public facebook::thrift::TException {
+public:
+ TooManyPendingTasksException():TException("TooManyPendingTasksException"){};
+ TooManyPendingTasksException(const std::string& message ) :
+ TException(message) {}
+};
class SystemResourceException : public facebook::thrift::TException {
public:
diff --git a/lib/cpp/src/transport/TSocket.cpp b/lib/cpp/src/transport/TSocket.cpp
index 32fc2e7..0777807 100644
--- a/lib/cpp/src/transport/TSocket.cpp
+++ b/lib/cpp/src/transport/TSocket.cpp
@@ -36,9 +36,6 @@
// Mutex to protect syscalls to netdb
static Monitor s_netdb_monitor;
-// TODO(mcslee): Make this an option to the socket class
-#define MAX_RECV_RETRIES 20
-
TSocket::TSocket(string host, int port) :
host_(host),
port_(port),
@@ -48,7 +45,8 @@
recvTimeout_(0),
lingerOn_(1),
lingerVal_(0),
- noDelay_(1) {
+ noDelay_(1),
+ maxRecvRetries_(5) {
recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
}
@@ -62,7 +60,8 @@
recvTimeout_(0),
lingerOn_(1),
lingerVal_(0),
- noDelay_(1) {
+ noDelay_(1),
+ maxRecvRetries_(5) {
recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
}
@@ -76,7 +75,8 @@
recvTimeout_(0),
lingerOn_(1),
lingerVal_(0),
- noDelay_(1) {
+ noDelay_(1),
+ maxRecvRetries_(5) {
recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
}
@@ -235,23 +235,52 @@
throw TTransportException(TTransportException::NOT_OPEN, "Called read on non-open socket");
}
- uint32_t retries = 0;
-
- try_again:
+ int32_t retries = 0;
+
+ // EAGAIN can be signalled both when a timeout has occurred and when
+ // the system is out of resources (an awesome undocumented feature).
+ // The following is an approximation of the time interval under which
+ // EAGAIN is taken to indicate an out of resources error.
+ uint32_t eagainThresholdMicros = 0;
+ if (recvTimeout_) {
+ // if a readTimeout is specified along with a max number of recv retries, then
+ // the threshold will ensure that the read timeout is not exceeded even in the
+ // case of resource errors
+ eagainThresholdMicros = (recvTimeout_*1000)/ ((maxRecvRetries_>0) ? maxRecvRetries_ : 2);
+ }
+
+ try_again:
// Read from the socket
+ struct timeval begin;
+ gettimeofday(&begin, NULL);
int got = recv(socket_, buf, len, 0);
+ struct timeval end;
+ gettimeofday(&end, NULL);
+ uint32_t readElapsedMicros = (((end.tv_sec - begin.tv_sec) * 1000 * 1000)
+ + (((uint64_t)(end.tv_usec - begin.tv_usec))));
++g_socket_syscalls;
-
+
// Check for error on read
if (got < 0) {
- // If temporarily out of resources, sleep a bit and try again
- if (errno == EAGAIN && retries++ < MAX_RECV_RETRIES) {
- usleep(50);
- goto try_again;
+ if (errno == EAGAIN) {
+ // check if this is the lack of resources or timeout case
+ if (!eagainThresholdMicros || (readElapsedMicros < eagainThresholdMicros)) {
+ if (retries++ < maxRecvRetries_) {
+ usleep(50);
+ goto try_again;
+ } else {
+ throw TTransportException(TTransportException::TIMED_OUT,
+ "EAGAIN (unavailable resources)");
+ }
+ } else {
+ // infer that timeout has been hit
+ throw TTransportException(TTransportException::TIMED_OUT,
+ "EAGAIN (timed out)");
+ }
}
// If interrupted, try again
- if (errno == EINTR && retries++ < MAX_RECV_RETRIES) {
+ if (errno == EINTR && retries++ < maxRecvRetries_) {
goto try_again;
}
@@ -407,4 +436,8 @@
}
}
+void TSocket::setMaxRecvRetries(int maxRecvRetries) {
+ maxRecvRetries_ = maxRecvRetries;
+}
+
}}} // facebook::thrift::transport
diff --git a/lib/cpp/src/transport/TSocket.h b/lib/cpp/src/transport/TSocket.h
index b00f6ff..515172d 100644
--- a/lib/cpp/src/transport/TSocket.h
+++ b/lib/cpp/src/transport/TSocket.h
@@ -19,6 +19,7 @@
* TCP Socket implementation of the TTransport interface.
*
* @author Mark Slee <mcslee@facebook.com>
+ * @author Aditya Agarwal <aditya@facebook.com>
*/
class TSocket : public TTransport {
/**
@@ -130,6 +131,12 @@
*/
void setSendTimeout(int ms);
+ /**
+ * Set the max number of recv retries in case of an EAGAIN
+ * error
+ */
+ void setMaxRecvRetries(int maxRecvRetries);
+
protected:
/**
* Constructor to create socket from raw UNIX handle. Never called directly
@@ -164,6 +171,9 @@
/** Nodelay */
bool noDelay_;
+ /** Recv EGAIN retries */
+ int maxRecvRetries_;
+
/** Recv timeout timeval */
struct timeval recvTimeval_;
};