Thrift: C++ peek() method and TException not Exception
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664876 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/Thrift.h b/lib/cpp/src/Thrift.h
index 92143cc..e497255 100644
--- a/lib/cpp/src/Thrift.h
+++ b/lib/cpp/src/Thrift.h
@@ -14,15 +14,21 @@
namespace facebook { namespace thrift {
-class Exception : public std::exception {
+class TException : public std::exception {
public:
- Exception(const std::string message) :
+ TException() {}
+
+ TException(const std::string message) :
message_(message) {}
- ~Exception() throw () {}
+ ~TException() throw() {}
const char* what() {
- return message_.c_str();
+ if (message_.empty()) {
+ return "Default TException.";
+ } else {
+ return message_.c_str();
+ }
}
private:
diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cpp b/lib/cpp/src/concurrency/PosixThreadFactory.cpp
index 74a3ec3..262fb2f 100644
--- a/lib/cpp/src/concurrency/PosixThreadFactory.cpp
+++ b/lib/cpp/src/concurrency/PosixThreadFactory.cpp
@@ -138,12 +138,15 @@
* API values.
*/
static int toPthreadPolicy(POLICY policy) {
- switch(policy) {
- case OTHER: return SCHED_OTHER; break;
- case FIFO: return SCHED_FIFO; break;
- case ROUND_ROBIN: return SCHED_RR; break;
- default: return SCHED_OTHER; break;
+ switch (policy) {
+ case OTHER:
+ return SCHED_OTHER;
+ case FIFO:
+ return SCHED_FIFO;
+ case ROUND_ROBIN:
+ return SCHED_RR;
}
+ return SCHED_OTHER;
}
/**
diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp
index 75a209e..a7c8393 100644
--- a/lib/cpp/src/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/server/TNonblockingServer.cpp
@@ -154,10 +154,14 @@
try {
// Invoke the processor
server_->getProcessor()->process(inputProtocol_, outputProtocol_);
- } catch (TTransportException &x) {
- fprintf(stderr, "Server::process %s\n", x.getMessage().c_str());
+ } catch (TTransportException &ttx) {
+ fprintf(stderr, "Server::process() %s\n", ttx.what());
close();
- return;
+ return;
+ } catch (TException &x) {
+ fprintf(stderr, "Server::process() %s\n", x.what());
+ close();
+ return;
} catch (...) {
fprintf(stderr, "Server::process() unknown exception\n");
close();
diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h
index faa572f..11b58b1 100644
--- a/lib/cpp/src/server/TNonblockingServer.h
+++ b/lib/cpp/src/server/TNonblockingServer.h
@@ -186,7 +186,7 @@
TConnection(int socket, short eventFlags, TNonblockingServer *s) {
readBuffer_ = (uint8_t*)malloc(1024);
if (readBuffer_ == NULL) {
- throw new facebook::thrift::Exception("Out of memory.");
+ throw new facebook::thrift::TException("Out of memory.");
}
readBufferSize_ = 1024;
diff --git a/lib/cpp/src/server/TSimpleServer.cpp b/lib/cpp/src/server/TSimpleServer.cpp
index 3eb035e..b453ce6 100644
--- a/lib/cpp/src/server/TSimpleServer.cpp
+++ b/lib/cpp/src/server/TSimpleServer.cpp
@@ -21,7 +21,7 @@
// Start the server listening
serverTransport_->listen();
} catch (TTransportException& ttx) {
- cerr << "TSimpleServer::run() listen(): " << ttx.getMessage() << endl;
+ cerr << "TSimpleServer::run() listen(): " << ttx.what() << endl;
return;
}
@@ -32,16 +32,21 @@
iot = transportFactory_->getIOTransports(client);
iop = protocolFactory_->getIOProtocols(iot.first, iot.second);
try {
- while (processor_->process(iop.first, iop.second)) {}
+ while (processor_->process(iop.first, iop.second)) {
+ // Peek ahead, is the remote side closed?
+ if (!iot.first->peek()) {
+ break;
+ }
+ }
} catch (TTransportException& ttx) {
- cerr << "TSimpleServer client died: " << ttx.getMessage() << endl;
+ cerr << "TSimpleServer client died: " << ttx.what() << endl;
}
iot.first->close();
iot.second->close();
client->close();
}
} catch (TTransportException& ttx) {
- cerr << "TServerTransport died on accept: " << ttx.getMessage() << endl;
+ cerr << "TServerTransport died on accept: " << ttx.what() << endl;
}
// TODO(mcslee): Could this be a timeout case? Or always the real thing?
diff --git a/lib/cpp/src/server/TThreadPoolServer.cpp b/lib/cpp/src/server/TThreadPoolServer.cpp
index 7885f0f..357152b 100644
--- a/lib/cpp/src/server/TThreadPoolServer.cpp
+++ b/lib/cpp/src/server/TThreadPoolServer.cpp
@@ -8,6 +8,7 @@
namespace facebook { namespace thrift { namespace server {
using namespace std;
+using namespace facebook::thrift;
using namespace facebook::thrift::concurrency;
using namespace facebook::thrift::transport;
@@ -26,14 +27,18 @@
~Task() {}
void run() {
- while(true) {
- try {
- processor_->process(input_, output_);
- } catch (TTransportException& ttx) {
- break;
- } catch(...) {
- break;
+ try {
+ while (processor_->process(input_, output_)) {
+ if (!input_->getInputTransport()->peek()) {
+ break;
+ }
}
+ } catch (TTransportException& ttx) {
+ cerr << "TThreadPoolServer client died: " << ttx.what() << endl;
+ } catch (TException& x) {
+ cerr << "TThreadPoolServer exception: " << x.what() << endl;
+ } catch (...) {
+ cerr << "TThreadPoolServer uncaught exception." << endl;
}
input_->getInputTransport()->close();
output_->getOutputTransport()->close();
@@ -68,7 +73,7 @@
// Start the server listening
serverTransport_->listen();
} catch (TTransportException& ttx) {
- cerr << "TThreadPoolServer::run() listen(): " << ttx.getMessage() << endl;
+ cerr << "TThreadPoolServer::run() listen(): " << ttx.what() << endl;
return;
}
diff --git a/lib/cpp/src/transport/TSocket.cpp b/lib/cpp/src/transport/TSocket.cpp
index de3bea7..8b9048c 100644
--- a/lib/cpp/src/transport/TSocket.cpp
+++ b/lib/cpp/src/transport/TSocket.cpp
@@ -43,6 +43,8 @@
lingerOn_(1),
lingerVal_(0),
noDelay_(1) {
+ recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
+ recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
}
TSocket::TSocket(int socket) :
@@ -55,6 +57,8 @@
lingerOn_(1),
lingerVal_(0),
noDelay_(1) {
+ recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
+ recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
}
TSocket::~TSocket() {
@@ -65,6 +69,20 @@
return (socket_ > 0);
}
+bool TSocket::peek() {
+ if (!isOpen()) {
+ return false;
+ }
+ uint8_t buf;
+ int r = recv(socket_, &buf, 1, MSG_PEEK);
+ if (r == -1) {
+ perror("TSocket::peek()");
+ close();
+ throw TTransportException(TTX_UNKNOWN, "recv() ERROR:" + errno);
+ }
+ return (r > 0);
+}
+
void TSocket::open() {
// Create socket
socket_ = socket(AF_INET, SOCK_STREAM, 0);
@@ -322,12 +340,14 @@
void TSocket::setRecvTimeout(int ms) {
recvTimeout_ = ms;
+ recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
+ recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
if (socket_ <= 0) {
return;
}
- struct timeval r = {(int)(recvTimeout_/1000),
- (int)((recvTimeout_%1000)*1000)};
+ // Copy because select may modify
+ struct timeval r = recvTimeval_;
int ret = setsockopt(socket_, SOL_SOCKET, SO_RCVTIMEO, &r, sizeof(r));
if (ret == -1) {
perror("TSocket::setRecvTimeout()");
diff --git a/lib/cpp/src/transport/TSocket.h b/lib/cpp/src/transport/TSocket.h
index b946b6a..8137984 100644
--- a/lib/cpp/src/transport/TSocket.h
+++ b/lib/cpp/src/transport/TSocket.h
@@ -2,6 +2,7 @@
#define _THRIFT_TRANSPORT_TSOCKET_H_ 1
#include <string>
+#include <sys/time.h>
#include "TTransport.h"
#include "TServerSocket.h"
@@ -45,6 +46,11 @@
bool isOpen();
/**
+ * Calls select on the socket to see if there is more data available.
+ */
+ bool peek();
+
+ /**
* Creates and opens the UNIX socket.
*
* @throws TTransportException If the socket could not connect
@@ -131,6 +137,9 @@
/** Nodelay */
bool noDelay_;
+
+ /** Recv timeout timeval */
+ struct timeval recvTimeval_;
};
}}} // facebook::thrift::transport
diff --git a/lib/cpp/src/transport/TTransport.h b/lib/cpp/src/transport/TTransport.h
index 7b4cbe1..5e4ae6b 100644
--- a/lib/cpp/src/transport/TTransport.h
+++ b/lib/cpp/src/transport/TTransport.h
@@ -24,7 +24,21 @@
/**
* Whether this transport is open.
*/
- virtual bool isOpen() { return false; }
+ virtual bool isOpen() {
+ return false;
+ }
+
+ /**
+ * Tests whether there is more data to read or if the remote side is
+ * still open. By default this is true whenever the transport is open,
+ * but implementations should add logic to test for this condition where
+ * possible (i.e. on a socket).
+ * This is used by a server to check if it should listen for another
+ * request.
+ */
+ virtual bool peek() {
+ return isOpen();
+ }
/**
* Opens the transport for communications.
diff --git a/lib/cpp/src/transport/TTransportException.h b/lib/cpp/src/transport/TTransportException.h
index c54084d..e02eb70 100644
--- a/lib/cpp/src/transport/TTransportException.h
+++ b/lib/cpp/src/transport/TTransportException.h
@@ -23,21 +23,25 @@
*
* @author Mark Slee <mcslee@facebook.com>
*/
-class TTransportException {
+class TTransportException : public facebook::thrift::TException {
public:
TTransportException() :
- type_(TTX_UNKNOWN), message_() {}
+ facebook::thrift::TException(),
+ type_(TTX_UNKNOWN) {}
TTransportException(TTransportExceptionType type) :
- type_(type), message_() {}
+ facebook::thrift::TException(),
+ type_(type) {}
- TTransportException(std::string message) :
- type_(TTX_UNKNOWN), message_(message) {}
+ TTransportException(const std::string message) :
+ facebook::thrift::TException(message),
+ type_(TTX_UNKNOWN) {}
- TTransportException(TTransportExceptionType type, std::string message) :
- type_(type), message_(message) {}
+ TTransportException(TTransportExceptionType type, const std::string message) :
+ facebook::thrift::TException(message),
+ type_(type) {}
- ~TTransportException() {}
+ virtual ~TTransportException() throw() {}
/**
* Returns an error code that provides information about the type of error
@@ -45,21 +49,14 @@
*
* @return Error code
*/
- TTransportExceptionType getType() { return type_; }
+ TTransportExceptionType getType() {
+ return type_;
+ }
- /**
- * Returns an informative message about what caused this error.
- *
- * @return Error string
- */
- const std::string& getMessage() { return message_; }
-
protected:
/** Error code */
TTransportExceptionType type_;
- /** Description */
- std::string message_;
};
}}} // facebook::thrift::transport
diff --git a/lib/cpp/src/transport/TTransportUtils.cpp b/lib/cpp/src/transport/TTransportUtils.cpp
index d9f4775..02454e3 100644
--- a/lib/cpp/src/transport/TTransportUtils.cpp
+++ b/lib/cpp/src/transport/TTransportUtils.cpp
@@ -16,7 +16,6 @@
buf += rLen_-rPos_;
}
// Get more from underlying transport up to buffer size
- // TODO: should this be a readAll?
rLen_ = transport_->read(rBuf_, rBufSize_);
rPos_ = 0;
}
diff --git a/lib/cpp/src/transport/TTransportUtils.h b/lib/cpp/src/transport/TTransportUtils.h
index 8d8d093..a8003cf 100644
--- a/lib/cpp/src/transport/TTransportUtils.h
+++ b/lib/cpp/src/transport/TTransportUtils.h
@@ -71,6 +71,14 @@
return transport_->isOpen();
}
+ bool peek() {
+ if (rPos_ >= rLen_) {
+ rLen_ = transport_->read(rBuf_, rBufSize_);
+ rPos_ = 0;
+ }
+ return (rLen_ > rPos_);
+ }
+
void open() {
transport_->open();
}
@@ -177,6 +185,13 @@
return transport_->isOpen();
}
+ bool peek() {
+ if (rPos_ < rLen_) {
+ return true;
+ }
+ return transport_->peek();
+ }
+
void close() {
transport_->close();
}
@@ -260,7 +275,10 @@
return true;
}
-
+ bool peek() {
+ return (rPos_ < wPos_);
+ }
+
void open() {}
void close() {}