Thrift: C++ peek() method and TException not Exception


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664876 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/Thrift.h b/lib/cpp/src/Thrift.h
index 92143cc..e497255 100644
--- a/lib/cpp/src/Thrift.h
+++ b/lib/cpp/src/Thrift.h
@@ -14,15 +14,21 @@
 
 namespace facebook { namespace thrift {
 
-class Exception : public std::exception {
+class TException : public std::exception {
 public:
-  Exception(const std::string message) :
+  TException() {}
+
+  TException(const std::string message) :
     message_(message) {}
 
-  ~Exception() throw () {}
+  ~TException() throw() {}
 
   const char* what() {
-    return message_.c_str();
+    if (message_.empty()) {
+      return "Default TException.";
+    } else {
+      return message_.c_str();
+    }
   }
 
 private:
diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cpp b/lib/cpp/src/concurrency/PosixThreadFactory.cpp
index 74a3ec3..262fb2f 100644
--- a/lib/cpp/src/concurrency/PosixThreadFactory.cpp
+++ b/lib/cpp/src/concurrency/PosixThreadFactory.cpp
@@ -138,12 +138,15 @@
    * API values.
    */
   static int toPthreadPolicy(POLICY policy) {
-    switch(policy) {
-    case OTHER: return SCHED_OTHER; break;
-    case FIFO: return SCHED_FIFO; break;
-    case ROUND_ROBIN: return SCHED_RR; break;
-    default: return SCHED_OTHER; break;
+    switch (policy) {
+    case OTHER:
+      return SCHED_OTHER;
+    case FIFO:
+      return SCHED_FIFO;
+    case ROUND_ROBIN:
+      return SCHED_RR;
     }
+    return SCHED_OTHER;
   }
 
   /**
diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp
index 75a209e..a7c8393 100644
--- a/lib/cpp/src/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/server/TNonblockingServer.cpp
@@ -154,10 +154,14 @@
     try {
       // Invoke the processor
       server_->getProcessor()->process(inputProtocol_, outputProtocol_);
-    } catch (TTransportException &x) {
-      fprintf(stderr, "Server::process %s\n", x.getMessage().c_str());
+    } catch (TTransportException &ttx) {
+      fprintf(stderr, "Server::process() %s\n", ttx.what());
       close();
-      return;    
+      return;
+    } catch (TException &x) {
+      fprintf(stderr, "Server::process() %s\n", x.what());
+      close();     
+      return;
     } catch (...) {
       fprintf(stderr, "Server::process() unknown exception\n");
       close();
diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h
index faa572f..11b58b1 100644
--- a/lib/cpp/src/server/TNonblockingServer.h
+++ b/lib/cpp/src/server/TNonblockingServer.h
@@ -186,7 +186,7 @@
   TConnection(int socket, short eventFlags, TNonblockingServer *s) {
     readBuffer_ = (uint8_t*)malloc(1024);
     if (readBuffer_ == NULL) {
-      throw new facebook::thrift::Exception("Out of memory.");
+      throw new facebook::thrift::TException("Out of memory.");
     }
     readBufferSize_ = 1024;
     
diff --git a/lib/cpp/src/server/TSimpleServer.cpp b/lib/cpp/src/server/TSimpleServer.cpp
index 3eb035e..b453ce6 100644
--- a/lib/cpp/src/server/TSimpleServer.cpp
+++ b/lib/cpp/src/server/TSimpleServer.cpp
@@ -21,7 +21,7 @@
     // Start the server listening
     serverTransport_->listen();
   } catch (TTransportException& ttx) {
-    cerr << "TSimpleServer::run() listen(): " << ttx.getMessage() << endl;
+    cerr << "TSimpleServer::run() listen(): " << ttx.what() << endl;
     return;
   }
 
@@ -32,16 +32,21 @@
       iot = transportFactory_->getIOTransports(client);
       iop = protocolFactory_->getIOProtocols(iot.first, iot.second);
       try {
-        while (processor_->process(iop.first, iop.second)) {}
+        while (processor_->process(iop.first, iop.second)) {
+          // Peek ahead, is the remote side closed?
+          if (!iot.first->peek()) {
+            break;
+          }
+        }
       } catch (TTransportException& ttx) {
-        cerr << "TSimpleServer client died: " << ttx.getMessage() << endl;
+        cerr << "TSimpleServer client died: " << ttx.what() << endl;
       }
       iot.first->close();
       iot.second->close();
       client->close();
     }
   } catch (TTransportException& ttx) {
-    cerr << "TServerTransport died on accept: " << ttx.getMessage() << endl;
+    cerr << "TServerTransport died on accept: " << ttx.what() << endl;
   }
 
   // TODO(mcslee): Could this be a timeout case? Or always the real thing?
diff --git a/lib/cpp/src/server/TThreadPoolServer.cpp b/lib/cpp/src/server/TThreadPoolServer.cpp
index 7885f0f..357152b 100644
--- a/lib/cpp/src/server/TThreadPoolServer.cpp
+++ b/lib/cpp/src/server/TThreadPoolServer.cpp
@@ -8,6 +8,7 @@
 namespace facebook { namespace thrift { namespace server { 
 
 using namespace std;
+using namespace facebook::thrift;
 using namespace facebook::thrift::concurrency;
 using namespace facebook::thrift::transport;
 
@@ -26,14 +27,18 @@
   ~Task() {}
     
   void run() {     
-    while(true) {
-      try {
-	processor_->process(input_, output_);
-      } catch (TTransportException& ttx) {
-        break;
-      } catch(...) {
-        break;
+    try {
+      while (processor_->process(input_, output_)) {
+        if (!input_->getInputTransport()->peek()) {
+          break;
+        }
       }
+    } catch (TTransportException& ttx) {
+      cerr << "TThreadPoolServer client died: " << ttx.what() << endl;
+    } catch (TException& x) {
+      cerr << "TThreadPoolServer exception: " << x.what() << endl;
+    } catch (...) {
+      cerr << "TThreadPoolServer uncaught exception." << endl;
     }
     input_->getInputTransport()->close();
     output_->getOutputTransport()->close();
@@ -68,7 +73,7 @@
     // Start the server listening
     serverTransport_->listen();
   } catch (TTransportException& ttx) {
-    cerr << "TThreadPoolServer::run() listen(): " << ttx.getMessage() << endl;
+    cerr << "TThreadPoolServer::run() listen(): " << ttx.what() << endl;
     return;
   }
   
diff --git a/lib/cpp/src/transport/TSocket.cpp b/lib/cpp/src/transport/TSocket.cpp
index de3bea7..8b9048c 100644
--- a/lib/cpp/src/transport/TSocket.cpp
+++ b/lib/cpp/src/transport/TSocket.cpp
@@ -43,6 +43,8 @@
   lingerOn_(1),
   lingerVal_(0),
   noDelay_(1) {
+  recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
+  recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
 }
 
 TSocket::TSocket(int socket) :
@@ -55,6 +57,8 @@
   lingerOn_(1),
   lingerVal_(0),
   noDelay_(1) {
+  recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
+  recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
 }
   
 TSocket::~TSocket() {
@@ -65,6 +69,20 @@
   return (socket_ > 0); 
 }
 
+bool TSocket::peek() {
+  if (!isOpen()) {
+    return false;
+  }
+  uint8_t buf;
+  int r = recv(socket_, &buf, 1, MSG_PEEK);
+  if (r == -1) {
+    perror("TSocket::peek()");
+    close();
+    throw TTransportException(TTX_UNKNOWN, "recv() ERROR:" + errno);
+  }
+  return (r > 0);
+}
+
 void TSocket::open() {
   // Create socket
   socket_ = socket(AF_INET, SOCK_STREAM, 0);
@@ -322,12 +340,14 @@
 
 void TSocket::setRecvTimeout(int ms) {
   recvTimeout_ = ms;
+  recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
+  recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
   if (socket_ <= 0) {
     return;
   }
 
-  struct timeval r = {(int)(recvTimeout_/1000),
-                      (int)((recvTimeout_%1000)*1000)};
+  // Copy because select may modify
+  struct timeval r = recvTimeval_;
   int ret = setsockopt(socket_, SOL_SOCKET, SO_RCVTIMEO, &r, sizeof(r));
   if (ret == -1) {
     perror("TSocket::setRecvTimeout()");
diff --git a/lib/cpp/src/transport/TSocket.h b/lib/cpp/src/transport/TSocket.h
index b946b6a..8137984 100644
--- a/lib/cpp/src/transport/TSocket.h
+++ b/lib/cpp/src/transport/TSocket.h
@@ -2,6 +2,7 @@
 #define _THRIFT_TRANSPORT_TSOCKET_H_ 1
 
 #include <string>
+#include <sys/time.h>
 
 #include "TTransport.h"
 #include "TServerSocket.h"
@@ -45,6 +46,11 @@
   bool isOpen();
 
   /**
+   * Calls select on the socket to see if there is more data available.
+   */
+  bool peek();
+
+  /**
    * Creates and opens the UNIX socket.
    *
    * @throws TTransportException If the socket could not connect
@@ -131,6 +137,9 @@
 
   /** Nodelay */
   bool noDelay_;
+
+  /** Recv timeout timeval */
+  struct timeval recvTimeval_;
 };
 
 }}} // facebook::thrift::transport
diff --git a/lib/cpp/src/transport/TTransport.h b/lib/cpp/src/transport/TTransport.h
index 7b4cbe1..5e4ae6b 100644
--- a/lib/cpp/src/transport/TTransport.h
+++ b/lib/cpp/src/transport/TTransport.h
@@ -24,7 +24,21 @@
   /**
    * Whether this transport is open.
    */
-  virtual bool isOpen() { return false; }
+  virtual bool isOpen() {
+    return false;
+  }
+
+  /**
+   * Tests whether there is more data to read or if the remote side is
+   * still open. By default this is true whenever the transport is open,
+   * but implementations should add logic to test for this condition where
+   * possible (i.e. on a socket).
+   * This is used by a server to check if it should listen for another
+   * request.
+   */
+  virtual bool peek() {
+    return isOpen();
+  }
 
   /**
    * Opens the transport for communications.
diff --git a/lib/cpp/src/transport/TTransportException.h b/lib/cpp/src/transport/TTransportException.h
index c54084d..e02eb70 100644
--- a/lib/cpp/src/transport/TTransportException.h
+++ b/lib/cpp/src/transport/TTransportException.h
@@ -23,21 +23,25 @@
  *
  * @author Mark Slee <mcslee@facebook.com>
  */
-class TTransportException {
+class TTransportException : public facebook::thrift::TException {
  public:
   TTransportException() :
-    type_(TTX_UNKNOWN), message_() {}
+    facebook::thrift::TException(),
+    type_(TTX_UNKNOWN) {}
 
   TTransportException(TTransportExceptionType type) :
-    type_(type), message_() {}
+    facebook::thrift::TException(), 
+    type_(type) {}
 
-  TTransportException(std::string message) :
-    type_(TTX_UNKNOWN), message_(message) {}
+  TTransportException(const std::string message) :
+    facebook::thrift::TException(message),
+    type_(TTX_UNKNOWN) {}
 
-  TTransportException(TTransportExceptionType type, std::string message) :
-    type_(type), message_(message) {}
+  TTransportException(TTransportExceptionType type, const std::string message) :
+    facebook::thrift::TException(message),
+    type_(type) {}
 
-  ~TTransportException() {}
+  virtual ~TTransportException() throw() {}
 
   /**
    * Returns an error code that provides information about the type of error
@@ -45,21 +49,14 @@
    *
    * @return Error code
    */
-  TTransportExceptionType getType() { return type_; }
+  TTransportExceptionType getType() {
+    return type_;
+  }
  
-  /**
-   * Returns an informative message about what caused this error.
-   *
-   * @return Error string
-   */
-  const std::string& getMessage() { return message_; }
-
  protected:
   /** Error code */
   TTransportExceptionType type_;
 
-  /** Description */
-  std::string message_;
 };
 
 }}} // facebook::thrift::transport
diff --git a/lib/cpp/src/transport/TTransportUtils.cpp b/lib/cpp/src/transport/TTransportUtils.cpp
index d9f4775..02454e3 100644
--- a/lib/cpp/src/transport/TTransportUtils.cpp
+++ b/lib/cpp/src/transport/TTransportUtils.cpp
@@ -16,7 +16,6 @@
       buf += rLen_-rPos_;
     }    
     // Get more from underlying transport up to buffer size
-    // TODO: should this be a readAll?
     rLen_ = transport_->read(rBuf_, rBufSize_);
     rPos_ = 0;
   }
diff --git a/lib/cpp/src/transport/TTransportUtils.h b/lib/cpp/src/transport/TTransportUtils.h
index 8d8d093..a8003cf 100644
--- a/lib/cpp/src/transport/TTransportUtils.h
+++ b/lib/cpp/src/transport/TTransportUtils.h
@@ -71,6 +71,14 @@
     return transport_->isOpen();
   }
   
+  bool peek() {    
+    if (rPos_ >= rLen_) {
+      rLen_ = transport_->read(rBuf_, rBufSize_);
+      rPos_ = 0;
+    }
+    return (rLen_ > rPos_);
+  }
+
   void open() {
     transport_->open();
   }
@@ -177,6 +185,13 @@
     return transport_->isOpen();
   }
 
+  bool peek() {
+    if (rPos_ < rLen_) {
+      return true;
+    }
+    return transport_->peek();
+  }
+
   void close() {
     transport_->close();
   }
@@ -260,7 +275,10 @@
     return true;
   }
 
- 
+  bool peek() {
+    return (rPos_ < wPos_);
+  }
+
   void open() {}
 
   void close() {}