-- fix read timeout handling in TSocket

Summary:
- turns out that EAGAIN can be returned both when there is a transmission timeout and when there
  is a lack of system resources.

This diff has a hacky fix for respecting a user specified read timeout.

Reviewed By: Steve Grimm, Marc, Slee

Test Plan:
- Tested by trying to crash an srp machine

Revert Plan: No.

Notes:
- Also added functionality to allow users to specify the max number of recv retries (in the case
  when EAGAIN is returned due to a lack of system resources)


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665121 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/Exception.h b/lib/cpp/src/concurrency/Exception.h
index 735cd87..575a3ed 100644
--- a/lib/cpp/src/concurrency/Exception.h
+++ b/lib/cpp/src/concurrency/Exception.h
@@ -20,9 +20,19 @@
 
 class IllegalStateException : public facebook::thrift::TException {};
 
-class TimedOutException : public facebook::thrift::TException {};
+class TimedOutException : public facebook::thrift::TException {
+public:                                            
+  TimedOutException():TException("TimedOutException"){};
+  TimedOutException(const std::string& message ) : 
+    TException(message) {}
+};
 
-class TooManyPendingTasksException : public facebook::thrift::TException {};
+class TooManyPendingTasksException : public facebook::thrift::TException {
+public:                                            
+  TooManyPendingTasksException():TException("TooManyPendingTasksException"){};
+  TooManyPendingTasksException(const std::string& message ) : 
+    TException(message) {}
+};
 
 class SystemResourceException : public facebook::thrift::TException {
 public:
diff --git a/lib/cpp/src/transport/TSocket.cpp b/lib/cpp/src/transport/TSocket.cpp
index 32fc2e7..0777807 100644
--- a/lib/cpp/src/transport/TSocket.cpp
+++ b/lib/cpp/src/transport/TSocket.cpp
@@ -36,9 +36,6 @@
 // Mutex to protect syscalls to netdb
 static Monitor s_netdb_monitor;
 
-// TODO(mcslee): Make this an option to the socket class
-#define MAX_RECV_RETRIES 20
-  
 TSocket::TSocket(string host, int port) : 
   host_(host),
   port_(port),
@@ -48,7 +45,8 @@
   recvTimeout_(0),
   lingerOn_(1),
   lingerVal_(0),
-  noDelay_(1) {
+  noDelay_(1),
+  maxRecvRetries_(5) {
   recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
   recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
 }
@@ -62,7 +60,8 @@
   recvTimeout_(0),
   lingerOn_(1),
   lingerVal_(0),
-  noDelay_(1) {
+  noDelay_(1),
+  maxRecvRetries_(5) {
   recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
   recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
 }
@@ -76,7 +75,8 @@
   recvTimeout_(0),
   lingerOn_(1),
   lingerVal_(0),
-  noDelay_(1) {
+  noDelay_(1),
+  maxRecvRetries_(5) {
   recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
   recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
 }
@@ -235,23 +235,52 @@
     throw TTransportException(TTransportException::NOT_OPEN, "Called read on non-open socket");
   }
 
-  uint32_t retries = 0;
-  
- try_again:
+  int32_t retries = 0;
+
+  // EAGAIN can be signalled both when a timeout has occurred and when
+  // the system is out of resources (an awesome undocumented feature).
+  // The following is an approximation of the time interval under which
+  // EAGAIN is taken to indicate an out of resources error.
+  uint32_t eagainThresholdMicros = 0;
+  if (recvTimeout_) {
+    // if a readTimeout is specified along with a max number of recv retries, then 
+    // the threshold will ensure that the read timeout is not exceeded even in the
+    // case of resource errors
+    eagainThresholdMicros = (recvTimeout_*1000)/ ((maxRecvRetries_>0) ? maxRecvRetries_ : 2);
+  }
+
+ try_again:  
   // Read from the socket
+  struct timeval begin;
+  gettimeofday(&begin, NULL);
   int got = recv(socket_, buf, len, 0);
+  struct timeval end;
+  gettimeofday(&end, NULL);
+  uint32_t readElapsedMicros =  (((end.tv_sec - begin.tv_sec) * 1000 * 1000)
+                                 + (((uint64_t)(end.tv_usec - begin.tv_usec))));
   ++g_socket_syscalls;
-  
+
   // Check for error on read
   if (got < 0) {   
-    // If temporarily out of resources, sleep a bit and try again
-    if (errno == EAGAIN && retries++ < MAX_RECV_RETRIES) {
-      usleep(50);
-      goto try_again;
+    if (errno == EAGAIN) {
+      // check if this is the lack of resources or timeout case
+      if (!eagainThresholdMicros || (readElapsedMicros < eagainThresholdMicros)) {
+        if (retries++ < maxRecvRetries_) {
+          usleep(50);
+          goto try_again;
+        } else {
+          throw TTransportException(TTransportException::TIMED_OUT, 
+                                    "EAGAIN (unavailable resources)");
+        }
+      } else {
+        // infer that timeout has been hit
+        throw TTransportException(TTransportException::TIMED_OUT, 
+                                  "EAGAIN (timed out)");
+      }
     }
     
     // If interrupted, try again
-    if (errno == EINTR && retries++ < MAX_RECV_RETRIES) {
+    if (errno == EINTR && retries++ < maxRecvRetries_) {
       goto try_again;
     }
     
@@ -407,4 +436,8 @@
   }
 }
 
+void TSocket::setMaxRecvRetries(int maxRecvRetries) {
+  maxRecvRetries_ = maxRecvRetries;
+}
+
 }}} // facebook::thrift::transport
diff --git a/lib/cpp/src/transport/TSocket.h b/lib/cpp/src/transport/TSocket.h
index b00f6ff..515172d 100644
--- a/lib/cpp/src/transport/TSocket.h
+++ b/lib/cpp/src/transport/TSocket.h
@@ -19,6 +19,7 @@
  * TCP Socket implementation of the TTransport interface.
  *
  * @author Mark Slee <mcslee@facebook.com>
+ * @author Aditya Agarwal <aditya@facebook.com>
  */
 class TSocket : public TTransport {
   /**
@@ -130,6 +131,12 @@
    */
   void setSendTimeout(int ms);
 
+  /**
+   * Set the max number of recv retries in case of an EAGAIN
+   * error
+   */
+  void setMaxRecvRetries(int maxRecvRetries);
+
  protected:
   /**
    * Constructor to create socket from raw UNIX handle. Never called directly
@@ -164,6 +171,9 @@
   /** Nodelay */
   bool noDelay_;
 
+  /** Recv EGAIN retries */
+  int maxRecvRetries_;
+
   /** Recv timeout timeval */
   struct timeval recvTimeval_;
 };