Thrift: C++ peek() method and TException not Exception
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664876 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/compiler/cpp/src/generate/t_cpp_generator.cc b/compiler/cpp/src/generate/t_cpp_generator.cc
index 3a38214..01345c7 100644
--- a/compiler/cpp/src/generate/t_cpp_generator.cc
+++ b/compiler/cpp/src/generate/t_cpp_generator.cc
@@ -845,7 +845,7 @@
indent() << "if (mtype != facebook::thrift::protocol::T_REPLY || fname.compare(\"" << (*f_iter)->get_name() << "\") != 0) {" << endl;
indent_up();
f_service_ <<
- indent() << "throw facebook::thrift::Exception(\"Unexpected message type, name, or id\");" << endl;
+ indent() << "throw facebook::thrift::TException(\"Unexpected message type, name, or id\");" << endl;
indent_down();
f_service_ <<
@@ -879,7 +879,7 @@
"return;" << endl;
} else {
f_service_ <<
- indent() << "throw facebook::thrift::Exception(\"" << (*f_iter)->get_name() << " failed: unknown result\");" << endl;
+ indent() << "throw facebook::thrift::TException(\"" << (*f_iter)->get_name() << " failed: unknown result\");" << endl;
}
// Close function
@@ -985,7 +985,7 @@
indent() << "iprot->readMessageBegin(fname, mtype, seqid);" << endl <<
endl <<
indent() << "if (mtype != facebook::thrift::protocol::T_CALL) {" << endl <<
- indent() << " throw facebook::thrift::Exception(\"Unexpected message type\");" << endl <<
+ indent() << " throw facebook::thrift::TException(\"Unexpected message type\");" << endl <<
indent() << "}" << endl <<
endl <<
indent() << "return process_fn(iprot, oprot, fname, seqid);" <<
@@ -1007,7 +1007,7 @@
indent() << "if (pfn == processMap_.end()) {" << endl;
if (extends.empty()) {
f_service_ <<
- indent() << " throw facebook::thrift::Exception(\"Unknown function name: '\"+fname+\"'\");" << endl;
+ indent() << " throw facebook::thrift::TException(\"Unknown function name: '\"+fname+\"'\");" << endl;
} else {
f_service_ <<
indent() << " return " << extends << "Processor::process_fn(iprot, oprot, fname, seqid);" << endl;
diff --git a/lib/cpp/src/Thrift.h b/lib/cpp/src/Thrift.h
index 92143cc..e497255 100644
--- a/lib/cpp/src/Thrift.h
+++ b/lib/cpp/src/Thrift.h
@@ -14,15 +14,21 @@
namespace facebook { namespace thrift {
-class Exception : public std::exception {
+class TException : public std::exception {
public:
- Exception(const std::string message) :
+ TException() {}
+
+ TException(const std::string message) :
message_(message) {}
- ~Exception() throw () {}
+ ~TException() throw() {}
const char* what() {
- return message_.c_str();
+ if (message_.empty()) {
+ return "Default TException.";
+ } else {
+ return message_.c_str();
+ }
}
private:
diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cpp b/lib/cpp/src/concurrency/PosixThreadFactory.cpp
index 74a3ec3..262fb2f 100644
--- a/lib/cpp/src/concurrency/PosixThreadFactory.cpp
+++ b/lib/cpp/src/concurrency/PosixThreadFactory.cpp
@@ -138,12 +138,15 @@
* API values.
*/
static int toPthreadPolicy(POLICY policy) {
- switch(policy) {
- case OTHER: return SCHED_OTHER; break;
- case FIFO: return SCHED_FIFO; break;
- case ROUND_ROBIN: return SCHED_RR; break;
- default: return SCHED_OTHER; break;
+ switch (policy) {
+ case OTHER:
+ return SCHED_OTHER;
+ case FIFO:
+ return SCHED_FIFO;
+ case ROUND_ROBIN:
+ return SCHED_RR;
}
+ return SCHED_OTHER;
}
/**
diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp
index 75a209e..a7c8393 100644
--- a/lib/cpp/src/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/server/TNonblockingServer.cpp
@@ -154,10 +154,14 @@
try {
// Invoke the processor
server_->getProcessor()->process(inputProtocol_, outputProtocol_);
- } catch (TTransportException &x) {
- fprintf(stderr, "Server::process %s\n", x.getMessage().c_str());
+ } catch (TTransportException &ttx) {
+ fprintf(stderr, "Server::process() %s\n", ttx.what());
close();
- return;
+ return;
+ } catch (TException &x) {
+ fprintf(stderr, "Server::process() %s\n", x.what());
+ close();
+ return;
} catch (...) {
fprintf(stderr, "Server::process() unknown exception\n");
close();
diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h
index faa572f..11b58b1 100644
--- a/lib/cpp/src/server/TNonblockingServer.h
+++ b/lib/cpp/src/server/TNonblockingServer.h
@@ -186,7 +186,7 @@
TConnection(int socket, short eventFlags, TNonblockingServer *s) {
readBuffer_ = (uint8_t*)malloc(1024);
if (readBuffer_ == NULL) {
- throw new facebook::thrift::Exception("Out of memory.");
+ throw new facebook::thrift::TException("Out of memory.");
}
readBufferSize_ = 1024;
diff --git a/lib/cpp/src/server/TSimpleServer.cpp b/lib/cpp/src/server/TSimpleServer.cpp
index 3eb035e..b453ce6 100644
--- a/lib/cpp/src/server/TSimpleServer.cpp
+++ b/lib/cpp/src/server/TSimpleServer.cpp
@@ -21,7 +21,7 @@
// Start the server listening
serverTransport_->listen();
} catch (TTransportException& ttx) {
- cerr << "TSimpleServer::run() listen(): " << ttx.getMessage() << endl;
+ cerr << "TSimpleServer::run() listen(): " << ttx.what() << endl;
return;
}
@@ -32,16 +32,21 @@
iot = transportFactory_->getIOTransports(client);
iop = protocolFactory_->getIOProtocols(iot.first, iot.second);
try {
- while (processor_->process(iop.first, iop.second)) {}
+ while (processor_->process(iop.first, iop.second)) {
+ // Peek ahead, is the remote side closed?
+ if (!iot.first->peek()) {
+ break;
+ }
+ }
} catch (TTransportException& ttx) {
- cerr << "TSimpleServer client died: " << ttx.getMessage() << endl;
+ cerr << "TSimpleServer client died: " << ttx.what() << endl;
}
iot.first->close();
iot.second->close();
client->close();
}
} catch (TTransportException& ttx) {
- cerr << "TServerTransport died on accept: " << ttx.getMessage() << endl;
+ cerr << "TServerTransport died on accept: " << ttx.what() << endl;
}
// TODO(mcslee): Could this be a timeout case? Or always the real thing?
diff --git a/lib/cpp/src/server/TThreadPoolServer.cpp b/lib/cpp/src/server/TThreadPoolServer.cpp
index 7885f0f..357152b 100644
--- a/lib/cpp/src/server/TThreadPoolServer.cpp
+++ b/lib/cpp/src/server/TThreadPoolServer.cpp
@@ -8,6 +8,7 @@
namespace facebook { namespace thrift { namespace server {
using namespace std;
+using namespace facebook::thrift;
using namespace facebook::thrift::concurrency;
using namespace facebook::thrift::transport;
@@ -26,14 +27,18 @@
~Task() {}
void run() {
- while(true) {
- try {
- processor_->process(input_, output_);
- } catch (TTransportException& ttx) {
- break;
- } catch(...) {
- break;
+ try {
+ while (processor_->process(input_, output_)) {
+ if (!input_->getInputTransport()->peek()) {
+ break;
+ }
}
+ } catch (TTransportException& ttx) {
+ cerr << "TThreadPoolServer client died: " << ttx.what() << endl;
+ } catch (TException& x) {
+ cerr << "TThreadPoolServer exception: " << x.what() << endl;
+ } catch (...) {
+ cerr << "TThreadPoolServer uncaught exception." << endl;
}
input_->getInputTransport()->close();
output_->getOutputTransport()->close();
@@ -68,7 +73,7 @@
// Start the server listening
serverTransport_->listen();
} catch (TTransportException& ttx) {
- cerr << "TThreadPoolServer::run() listen(): " << ttx.getMessage() << endl;
+ cerr << "TThreadPoolServer::run() listen(): " << ttx.what() << endl;
return;
}
diff --git a/lib/cpp/src/transport/TSocket.cpp b/lib/cpp/src/transport/TSocket.cpp
index de3bea7..8b9048c 100644
--- a/lib/cpp/src/transport/TSocket.cpp
+++ b/lib/cpp/src/transport/TSocket.cpp
@@ -43,6 +43,8 @@
lingerOn_(1),
lingerVal_(0),
noDelay_(1) {
+ recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
+ recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
}
TSocket::TSocket(int socket) :
@@ -55,6 +57,8 @@
lingerOn_(1),
lingerVal_(0),
noDelay_(1) {
+ recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
+ recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
}
TSocket::~TSocket() {
@@ -65,6 +69,20 @@
return (socket_ > 0);
}
+bool TSocket::peek() {
+ if (!isOpen()) {
+ return false;
+ }
+ uint8_t buf;
+ int r = recv(socket_, &buf, 1, MSG_PEEK);
+ if (r == -1) {
+ perror("TSocket::peek()");
+ close();
+ throw TTransportException(TTX_UNKNOWN, "recv() ERROR:" + errno);
+ }
+ return (r > 0);
+}
+
void TSocket::open() {
// Create socket
socket_ = socket(AF_INET, SOCK_STREAM, 0);
@@ -322,12 +340,14 @@
void TSocket::setRecvTimeout(int ms) {
recvTimeout_ = ms;
+ recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
+ recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
if (socket_ <= 0) {
return;
}
- struct timeval r = {(int)(recvTimeout_/1000),
- (int)((recvTimeout_%1000)*1000)};
+ // Copy because select may modify
+ struct timeval r = recvTimeval_;
int ret = setsockopt(socket_, SOL_SOCKET, SO_RCVTIMEO, &r, sizeof(r));
if (ret == -1) {
perror("TSocket::setRecvTimeout()");
diff --git a/lib/cpp/src/transport/TSocket.h b/lib/cpp/src/transport/TSocket.h
index b946b6a..8137984 100644
--- a/lib/cpp/src/transport/TSocket.h
+++ b/lib/cpp/src/transport/TSocket.h
@@ -2,6 +2,7 @@
#define _THRIFT_TRANSPORT_TSOCKET_H_ 1
#include <string>
+#include <sys/time.h>
#include "TTransport.h"
#include "TServerSocket.h"
@@ -45,6 +46,11 @@
bool isOpen();
/**
+ * Calls select on the socket to see if there is more data available.
+ */
+ bool peek();
+
+ /**
* Creates and opens the UNIX socket.
*
* @throws TTransportException If the socket could not connect
@@ -131,6 +137,9 @@
/** Nodelay */
bool noDelay_;
+
+ /** Recv timeout timeval */
+ struct timeval recvTimeval_;
};
}}} // facebook::thrift::transport
diff --git a/lib/cpp/src/transport/TTransport.h b/lib/cpp/src/transport/TTransport.h
index 7b4cbe1..5e4ae6b 100644
--- a/lib/cpp/src/transport/TTransport.h
+++ b/lib/cpp/src/transport/TTransport.h
@@ -24,7 +24,21 @@
/**
* Whether this transport is open.
*/
- virtual bool isOpen() { return false; }
+ virtual bool isOpen() {
+ return false;
+ }
+
+ /**
+ * Tests whether there is more data to read or if the remote side is
+ * still open. By default this is true whenever the transport is open,
+ * but implementations should add logic to test for this condition where
+ * possible (i.e. on a socket).
+ * This is used by a server to check if it should listen for another
+ * request.
+ */
+ virtual bool peek() {
+ return isOpen();
+ }
/**
* Opens the transport for communications.
diff --git a/lib/cpp/src/transport/TTransportException.h b/lib/cpp/src/transport/TTransportException.h
index c54084d..e02eb70 100644
--- a/lib/cpp/src/transport/TTransportException.h
+++ b/lib/cpp/src/transport/TTransportException.h
@@ -23,21 +23,25 @@
*
* @author Mark Slee <mcslee@facebook.com>
*/
-class TTransportException {
+class TTransportException : public facebook::thrift::TException {
public:
TTransportException() :
- type_(TTX_UNKNOWN), message_() {}
+ facebook::thrift::TException(),
+ type_(TTX_UNKNOWN) {}
TTransportException(TTransportExceptionType type) :
- type_(type), message_() {}
+ facebook::thrift::TException(),
+ type_(type) {}
- TTransportException(std::string message) :
- type_(TTX_UNKNOWN), message_(message) {}
+ TTransportException(const std::string message) :
+ facebook::thrift::TException(message),
+ type_(TTX_UNKNOWN) {}
- TTransportException(TTransportExceptionType type, std::string message) :
- type_(type), message_(message) {}
+ TTransportException(TTransportExceptionType type, const std::string message) :
+ facebook::thrift::TException(message),
+ type_(type) {}
- ~TTransportException() {}
+ virtual ~TTransportException() throw() {}
/**
* Returns an error code that provides information about the type of error
@@ -45,21 +49,14 @@
*
* @return Error code
*/
- TTransportExceptionType getType() { return type_; }
+ TTransportExceptionType getType() {
+ return type_;
+ }
- /**
- * Returns an informative message about what caused this error.
- *
- * @return Error string
- */
- const std::string& getMessage() { return message_; }
-
protected:
/** Error code */
TTransportExceptionType type_;
- /** Description */
- std::string message_;
};
}}} // facebook::thrift::transport
diff --git a/lib/cpp/src/transport/TTransportUtils.cpp b/lib/cpp/src/transport/TTransportUtils.cpp
index d9f4775..02454e3 100644
--- a/lib/cpp/src/transport/TTransportUtils.cpp
+++ b/lib/cpp/src/transport/TTransportUtils.cpp
@@ -16,7 +16,6 @@
buf += rLen_-rPos_;
}
// Get more from underlying transport up to buffer size
- // TODO: should this be a readAll?
rLen_ = transport_->read(rBuf_, rBufSize_);
rPos_ = 0;
}
diff --git a/lib/cpp/src/transport/TTransportUtils.h b/lib/cpp/src/transport/TTransportUtils.h
index 8d8d093..a8003cf 100644
--- a/lib/cpp/src/transport/TTransportUtils.h
+++ b/lib/cpp/src/transport/TTransportUtils.h
@@ -71,6 +71,14 @@
return transport_->isOpen();
}
+ bool peek() {
+ if (rPos_ >= rLen_) {
+ rLen_ = transport_->read(rBuf_, rBufSize_);
+ rPos_ = 0;
+ }
+ return (rLen_ > rPos_);
+ }
+
void open() {
transport_->open();
}
@@ -177,6 +185,13 @@
return transport_->isOpen();
}
+ bool peek() {
+ if (rPos_ < rLen_) {
+ return true;
+ }
+ return transport_->peek();
+ }
+
void close() {
transport_->close();
}
@@ -260,7 +275,10 @@
return true;
}
-
+ bool peek() {
+ return (rPos_ < wPos_);
+ }
+
void open() {}
void close() {}
diff --git a/test/cpp/src/TestClient.cpp b/test/cpp/src/TestClient.cpp
index e508b0f..db00347 100644
--- a/test/cpp/src/TestClient.cpp
+++ b/test/cpp/src/TestClient.cpp
@@ -88,7 +88,7 @@
try {
transport->open();
} catch (TTransportException& ttx) {
- printf("Connect failed: %s\n", ttx.getMessage().c_str());
+ printf("Connect failed: %s\n", ttx.what());
continue;
}
@@ -373,7 +373,7 @@
testClient.testException("Xception");
printf(" void\nFAILURE\n");
- } catch(Xception& e) {
+ } catch(Xception& e) {
printf(" {%u, \"%s\"}\n", e.errorCode, e.message.c_str());
}
diff --git a/test/cpp/src/main.cpp b/test/cpp/src/main.cpp
index d9643c3..8344a88 100644
--- a/test/cpp/src/main.cpp
+++ b/test/cpp/src/main.cpp
@@ -2,6 +2,7 @@
#include <concurrency/PosixThreadFactory.h>
#include <concurrency/Monitor.h>
#include <concurrency/Util.h>
+#include <concurrency/Mutex.h>
#include <protocol/TBinaryProtocol.h>
#include <server/TSimpleServer.h>
#include <server/TThreadPoolServer.h>
@@ -18,19 +19,57 @@
#include <stdexcept>
#include <sstream>
+#include <map>
+#include <ext/hash_map>
+using __gnu_cxx::hash_map;
+using __gnu_cxx::hash;
+
using namespace std;
using namespace facebook::thrift;
using namespace facebook::thrift::protocol;
using namespace facebook::thrift::transport;
using namespace facebook::thrift::server;
+using namespace facebook::thrift::concurrency;
using namespace test::stress;
+struct eqstr {
+ bool operator()(const char* s1, const char* s2) const {
+ return strcmp(s1, s2) == 0;
+ }
+};
+
+struct ltstr {
+ bool operator()(const char* s1, const char* s2) const {
+ return strcmp(s1, s2) < 0;
+ }
+};
+
+
+// typedef hash_map<const char*, int, hash<const char*>, eqstr> count_map;
+typedef map<const char*, int, ltstr> count_map;
+
class Server : public ServiceIf {
public:
- Server() {};
- void echoVoid() {return;}
+ Server() {}
+
+ void count(const char* method) {
+ MutexMonitor m(lock_);
+ int ct = counts_[method];
+ counts_[method] = ++ct;
+ }
+
+ void echoVoid() {
+ count("echoVoid");
+ return;
+ }
+
+ count_map getCount() {
+ MutexMonitor m(lock_);
+ return counts_;
+ }
+
int8_t echoByte(int8_t arg) {return arg;}
int32_t echoI32(int32_t arg) {return arg;}
int64_t echoI64(int64_t arg) {return arg;}
@@ -38,6 +77,11 @@
vector<int8_t> echoList(vector<int8_t> arg) {return arg;}
set<int8_t> echoSet(set<int8_t> arg) {return arg;}
map<int8_t, int8_t> echoMap(map<int8_t, int8_t> arg) {return arg;}
+
+private:
+ count_map counts_;
+ Mutex lock_;
+
};
class ClientThread: public Runnable {
@@ -252,10 +296,10 @@
shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
- if(runServer) {
+ // Dispatcher
+ shared_ptr<Server> serviceHandler(new Server());
- // Dispatcher
- shared_ptr<Server> serviceHandler(new Server());
+ if(runServer) {
shared_ptr<ServiceProcessor> serviceProcessor(new ServiceProcessor(serviceHandler));
@@ -390,6 +434,11 @@
cout << "workers :" << workerCount << ", client : " << clientCount << ", loops : " << loopCount << ", rate : " << (clientCount * loopCount * 1000) / ((double)(time01 - time00)) << endl;
+ count_map count = serviceHandler->getCount();
+ count_map::iterator iter;
+ for (iter = count.begin(); iter != count.end(); ++iter) {
+ printf("%s => %d\n", iter->first, iter->second);
+ }
cerr << "done." << endl;
}