THRIFT: generic output handler
Summary: I'm tired of getting output from thrift via perror AND exceptions, so
this class allows the client to set an alternate (or empty) handler for error
output
Reviewed By: mcslee
Test Plan: I ran on the worker with the default, got output via perror, then
overloaded with my own function and got output via syslog and then NULL
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665131 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/Thrift.cpp b/lib/cpp/src/Thrift.cpp
index 0f3ad88..186c5af 100644
--- a/lib/cpp/src/Thrift.cpp
+++ b/lib/cpp/src/Thrift.cpp
@@ -9,6 +9,8 @@
namespace facebook { namespace thrift {
+TOutput GlobalOutput;
+
uint32_t TApplicationException::read(facebook::thrift::protocol::TProtocol* iprot) {
uint32_t xfer = 0;
std::string fname;
diff --git a/lib/cpp/src/Thrift.h b/lib/cpp/src/Thrift.h
index 2cb656b..07ccb97 100644
--- a/lib/cpp/src/Thrift.h
+++ b/lib/cpp/src/Thrift.h
@@ -26,6 +26,24 @@
namespace facebook { namespace thrift {
+class TOutput{
+public:
+ TOutput() : f_(perror) {}
+
+ inline void setOutputFunction(void (*function)(const char *)){
+ f_ = function;
+ }
+
+ inline void operator()(const char *message){
+ f_(message);
+ }
+
+private:
+ void (*f_)(const char *);
+};
+
+extern TOutput GlobalOutput;
+
namespace protocol {
class TProtocol;
}
diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp
index 2b910a5..6337806 100644
--- a/lib/cpp/src/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/server/TNonblockingServer.cpp
@@ -62,7 +62,7 @@
}
readBuffer_ = (uint8_t*)realloc(readBuffer_, readBufferSize_);
if (readBuffer_ == NULL) {
- perror("TConnection::workSocket() realloc");
+ GlobalOutput("TConnection::workSocket() realloc");
close();
return;
}
@@ -91,7 +91,7 @@
}
if (errno != ECONNRESET) {
- perror("TConnection::workSocket() recv -1");
+ GlobalOutput("TConnection::workSocket() recv -1");
}
}
@@ -127,7 +127,7 @@
return;
}
if (errno != EPIPE) {
- perror("TConnection::workSocket() send -1");
+ GlobalOutput("TConnection::workSocket() send -1");
}
close();
return;
@@ -302,7 +302,7 @@
// Delete a previously existing event
if (eventFlags_ != 0) {
if (event_del(&event_) == -1) {
- perror("TConnection::setFlags event_del");
+ GlobalOutput("TConnection::setFlags event_del");
return;
}
}
@@ -341,7 +341,7 @@
// Add the event
if (event_add(&event_, 0) == -1) {
- perror("TConnection::setFlags(): coult not event_add");
+ GlobalOutput("TConnection::setFlags(): coult not event_add");
}
}
@@ -351,7 +351,7 @@
void TConnection::close() {
// Delete the registered libevent
if (event_del(&event_) == -1) {
- perror("TConnection::close() event_del");
+ GlobalOutput("TConnection::close() event_del");
}
// Close the socket
@@ -415,7 +415,7 @@
int flags;
if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {
- perror("thriftServerEventHandler: set O_NONBLOCK");
+ GlobalOutput("thriftServerEventHandler: set O_NONBLOCK");
close(clientSocket);
return;
}
@@ -438,7 +438,7 @@
// Done looping accept, now we have to make sure the error is due to
// blocking. Any other error is a problem
if (errno != EAGAIN && errno != EWOULDBLOCK) {
- perror("thriftServerEventHandler: accept()");
+ GlobalOutput("thriftServerEventHandler: accept()");
}
}
@@ -459,7 +459,7 @@
// Create the server socket
serverSocket_ = socket(AF_INET, SOCK_STREAM, 0);
if (serverSocket_ == -1) {
- perror("TNonblockingServer::serve() socket() -1");
+ GlobalOutput("TNonblockingServer::serve() socket() -1");
return;
}
@@ -467,7 +467,7 @@
int flags;
if ((flags = fcntl(serverSocket_, F_GETFL, 0)) < 0 ||
fcntl(serverSocket_, F_SETFL, flags | O_NONBLOCK) < 0) {
- perror("TNonblockingServer::serve() O_NONBLOCK");
+ GlobalOutput("TNonblockingServer::serve() O_NONBLOCK");
::close(serverSocket_);
return;
}
@@ -496,13 +496,13 @@
addr.sin_addr.s_addr = INADDR_ANY;
if (bind(serverSocket_, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
- perror("TNonblockingServer::serve() bind");
+ GlobalOutput("TNonblockingServer::serve() bind");
close(serverSocket_);
return;
}
if (listen(serverSocket_, LISTEN_BACKLOG) == -1) {
- perror("TNonblockingServer::serve() listen");
+ GlobalOutput("TNonblockingServer::serve() listen");
close(serverSocket_);
return;
}
@@ -517,7 +517,7 @@
// Add the event and start up the server
if (event_add(&serverEvent, 0) == -1) {
- perror("TNonblockingServer::serve(): coult not event_add");
+ GlobalOutput("TNonblockingServer::serve(): coult not event_add");
return;
}
diff --git a/lib/cpp/src/transport/TFileTransport.cpp b/lib/cpp/src/transport/TFileTransport.cpp
index 5039564..7a1ca4f 100644
--- a/lib/cpp/src/transport/TFileTransport.cpp
+++ b/lib/cpp/src/transport/TFileTransport.cpp
@@ -100,7 +100,7 @@
flush();
fprintf(stderr, "error, current file (%s) not closed\n", filename_.c_str());
if (-1 == ::close(fd_)) {
- perror("TFileTransport: error in file close");
+ GlobalOutput("TFileTransport: error in file close");
throw TTransportException("TFileTransport: error in file close");
}
}
@@ -158,7 +158,7 @@
// close logfile
if (fd_ > 0) {
if(-1 == ::close(fd_)) {
- perror("TFileTransport: error in file close");
+ GlobalOutput("TFileTransport: error in file close");
}
}
}
@@ -306,7 +306,7 @@
// empty out both the buffers
if (enqueueBuffer_->isEmpty() && dequeueBuffer_->isEmpty()) {
if (-1 == ::close(fd_)) {
- perror("TFileTransport: error in close");
+ GlobalOutput("TFileTransport: error in close");
throw TTransportException("TFileTransport: error in file close");
}
// just be safe and sync to disk
@@ -364,7 +364,7 @@
//T_DEBUG_L(1, "Adding padding of %u bytes at %lu (to reach chunk %lld)",
//padding, offset_, chunk2);
if (-1 == ::write(fd_, zeros, padding)) {
- perror("TFileTransport: error while padding zeros");
+ GlobalOutput("TFileTransport: error while padding zeros");
throw TTransportException("TFileTransport: error while padding zeros");
}
unflushed += padding;
@@ -375,7 +375,7 @@
// write the dequeued event to the file
if (outEvent->eventSize_ > 0) {
if (-1 == ::write(fd_, outEvent->eventBuff_, outEvent->eventSize_)) {
- perror("TFileTransport: error while writing event");
+ GlobalOutput("TFileTransport: error while writing event");
throw TTransportException("TFileTransport: error while writing event");
}
@@ -499,7 +499,7 @@
// read error
if (readState_.bufferLen_ == -1) {
readState_.resetAllValues();
- perror("TFileTransport: error while reading from file");
+ GlobalOutput("TFileTransport: error while reading from file");
throw TTransportException("TFileTransport: error while reading from file");
} else if (readState_.bufferLen_ == 0) { // EOF
// wait indefinitely if there is no timeout
@@ -655,7 +655,7 @@
char errorMsg[1024];
sprintf(errorMsg, "TFileTransport: log file corrupted at offset: %lu",
offset_ + readState_.lastDispatchPtr_);
- perror(errorMsg);
+ GlobalOutput(errorMsg);
throw TTransportException(errorMsg);
}
}
@@ -700,7 +700,7 @@
offset_ = lseek(fd_, newOffset, SEEK_SET);
readState_.resetAllValues();
if (offset_ == -1) {
- perror("TFileTransport: lseek error in seekToChunk");
+ GlobalOutput("TFileTransport: lseek error in seekToChunk");
throw TTransportException("TFileTransport: lseek error in seekToChunk");
}
@@ -747,7 +747,7 @@
if(fd_ == -1) {
char errorMsg[1024];
sprintf(errorMsg, "TFileTransport: Could not open file: %s", filename_.c_str());
- perror(errorMsg);
+ GlobalOutput(errorMsg);
throw TTransportException(errorMsg);
}
@@ -786,7 +786,7 @@
bool TFileTransportBuffer::addEvent(eventInfo *event) {
if (bufferMode_ == READ) {
- perror("Trying to write to a buffer in read mode");
+ GlobalOutput("Trying to write to a buffer in read mode");
}
if (writePoint_ < size_) {
buffer_[writePoint_++] = event;
diff --git a/lib/cpp/src/transport/TFileTransport.h b/lib/cpp/src/transport/TFileTransport.h
index 2cb50b4..abd8928 100644
--- a/lib/cpp/src/transport/TFileTransport.h
+++ b/lib/cpp/src/transport/TFileTransport.h
@@ -207,7 +207,7 @@
void setEventBufferSize(uint32_t bufferSize) {
if (bufferAndThreadInitialized_) {
- perror("Cannot change the buffer size after writer thread started");
+ GlobalOutput("Cannot change the buffer size after writer thread started");
return;
}
eventBufferSize_ = bufferSize;
diff --git a/lib/cpp/src/transport/TServerSocket.cpp b/lib/cpp/src/transport/TServerSocket.cpp
index f341a2b..0d25bf0 100644
--- a/lib/cpp/src/transport/TServerSocket.cpp
+++ b/lib/cpp/src/transport/TServerSocket.cpp
@@ -64,7 +64,7 @@
void TServerSocket::listen() {
int sv[2];
if (-1 == socketpair(AF_LOCAL, SOCK_STREAM, 0, sv)) {
- perror("TServerSocket::init()");
+ GlobalOutput("TServerSocket::init()");
intSock1_ = -1;
intSock2_ = -1;
} else {
@@ -74,7 +74,7 @@
serverSocket_ = socket(AF_INET, SOCK_STREAM, 0);
if (serverSocket_ == -1) {
- perror("TServerSocket::listen() socket");
+ GlobalOutput("TServerSocket::listen() socket");
close();
throw TTransportException(TTransportException::NOT_OPEN, "Could not create server socket.");
}
@@ -83,7 +83,7 @@
int one = 1;
if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_REUSEADDR,
&one, sizeof(one))) {
- perror("TServerSocket::listen() SO_REUSEADDR");
+ GlobalOutput("TServerSocket::listen() SO_REUSEADDR");
close();
throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_REUSEADDR");
}
@@ -92,7 +92,7 @@
#ifdef TCP_DEFER_ACCEPT
if (-1 == setsockopt(serverSocket_, SOL_SOCKET, TCP_DEFER_ACCEPT,
&one, sizeof(one))) {
- perror("TServerSocket::listen() TCP_DEFER_ACCEPT");
+ GlobalOutput("TServerSocket::listen() TCP_DEFER_ACCEPT");
close();
throw TTransportException(TTransportException::NOT_OPEN, "Could not set TCP_DEFER_ACCEPT");
}
@@ -103,7 +103,7 @@
if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_LINGER,
&ling, sizeof(ling))) {
close();
- perror("TServerSocket::listen() SO_LINGER");
+ GlobalOutput("TServerSocket::listen() SO_LINGER");
throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_LINGER");
}
@@ -111,7 +111,7 @@
if (-1 == setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY,
&one, sizeof(one))) {
close();
- perror("setsockopt TCP_NODELAY");
+ GlobalOutput("setsockopt TCP_NODELAY");
throw TTransportException(TTransportException::NOT_OPEN, "Could not set TCP_NODELAY");
}
@@ -147,14 +147,14 @@
if (retries > retryLimit_) {
char errbuf[1024];
sprintf(errbuf, "TServerSocket::listen() BIND %d", port_);
- perror(errbuf);
+ GlobalOutput(errbuf);
close();
throw TTransportException(TTransportException::NOT_OPEN, "Could not bind");
}
// Call listen
if (-1 == ::listen(serverSocket_, acceptBacklog_)) {
- perror("TServerSocket::listen() LISTEN");
+ GlobalOutput("TServerSocket::listen() LISTEN");
close();
throw TTransportException(TTransportException::NOT_OPEN, "Could not listen");
}
@@ -187,14 +187,14 @@
// a certain number
continue;
}
- perror("TServerSocket::acceptImpl() select -1");
+ GlobalOutput("TServerSocket::acceptImpl() select -1");
throw TTransportException(TTransportException::UNKNOWN);
} else if (ret > 0) {
// Check for an interrupt signal
if (intSock2_ >= 0 && FD_ISSET(intSock2_, &fds)) {
int8_t buf;
if (-1 == recv(intSock2_, &buf, sizeof(int8_t), 0)) {
- perror("TServerSocket::acceptImpl() interrupt receive");
+ GlobalOutput("TServerSocket::acceptImpl() interrupt receive");
}
throw TTransportException(TTransportException::INTERRUPTED);
}
@@ -203,7 +203,7 @@
break;
}
} else {
- perror("TServerSocket::acceptImpl() select 0");
+ GlobalOutput("TServerSocket::acceptImpl() select 0");
throw TTransportException(TTransportException::UNKNOWN);
}
}
@@ -215,18 +215,18 @@
(socklen_t *) &size);
if (clientSocket < 0) {
- perror("TServerSocket::accept()");
+ GlobalOutput("TServerSocket::accept()");
throw TTransportException(TTransportException::UNKNOWN, "ERROR:" + errno);
}
// Make sure client socket is blocking
int flags = fcntl(clientSocket, F_GETFL, 0);
if (flags == -1) {
- perror("TServerSocket::select() fcntl GETFL");
+ GlobalOutput("TServerSocket::select() fcntl GETFL");
throw TTransportException(TTransportException::UNKNOWN, "ERROR:" + errno);
}
if (-1 == fcntl(clientSocket, F_SETFL, flags & ~O_NONBLOCK)) {
- perror("TServerSocket::select() fcntl SETFL");
+ GlobalOutput("TServerSocket::select() fcntl SETFL");
throw TTransportException(TTransportException::UNKNOWN, "ERROR:" + errno);
}
@@ -245,7 +245,7 @@
if (intSock1_ >= 0) {
int8_t byte = 0;
if (-1 == send(intSock1_, &byte, sizeof(int8_t), 0)) {
- perror("TServerSocket::interrupt()");
+ GlobalOutput("TServerSocket::interrupt()");
}
}
}
diff --git a/lib/cpp/src/transport/TSocket.cpp b/lib/cpp/src/transport/TSocket.cpp
index 0777807..42364fa 100644
--- a/lib/cpp/src/transport/TSocket.cpp
+++ b/lib/cpp/src/transport/TSocket.cpp
@@ -96,7 +96,7 @@
uint8_t buf;
int r = recv(socket_, &buf, 1, MSG_PEEK);
if (r == -1) {
- perror("TSocket::peek()");
+ GlobalOutput("TSocket::peek()");
close();
throw TTransportException(TTransportException::UNKNOWN, "recv() ERROR:" + errno);
}
@@ -111,7 +111,7 @@
// Create socket
socket_ = socket(AF_INET, SOCK_STREAM, 0);
if (socket_ == -1) {
- perror("TSocket::open() socket");
+ GlobalOutput("TSocket::open() socket");
close();
throw TTransportException(TTransportException::NOT_OPEN, "socket() ERROR:" + errno);
}
@@ -143,7 +143,7 @@
struct hostent *host_entry = gethostbyname(host_.c_str());
if (host_entry == NULL) {
- perror("TSocket: dns error: failed call to gethostbyname.");
+ GlobalOutput("TSocket: dns error: failed call to gethostbyname.");
close();
throw TTransportException(TTransportException::NOT_OPEN, "gethostbyname() failed");
}
@@ -181,7 +181,7 @@
close();
char buff[1024];
sprintf(buff, "TSocket::open() connect %s %d", host_.c_str(), port_);
- perror(buff);
+ GlobalOutput(buff);
throw TTransportException(TTransportException::NOT_OPEN, "open() ERROR: " + errno);
}
@@ -198,22 +198,22 @@
int ret2 = getsockopt(socket_, SOL_SOCKET, SO_ERROR, (void *)&val, &lon);
if (ret2 == -1) {
close();
- perror("TSocket::open() getsockopt SO_ERROR");
+ GlobalOutput("TSocket::open() getsockopt SO_ERROR");
throw TTransportException(TTransportException::NOT_OPEN, "open() ERROR: " + errno);
}
if (val == 0) {
goto done;
}
close();
- perror("TSocket::open() SO_ERROR was set");
+ GlobalOutput("TSocket::open() SO_ERROR was set");
throw TTransportException(TTransportException::NOT_OPEN, "open() ERROR: " + errno);
} else if (ret == 0) {
close();
- perror("TSocket::open() timeed out");
+ GlobalOutput("TSocket::open() timeed out");
throw TTransportException(TTransportException::NOT_OPEN, "open() ERROR: " + errno);
} else {
close();
- perror("TSocket::open() select error");
+ GlobalOutput("TSocket::open() select error");
throw TTransportException(TTransportException::NOT_OPEN, "open() ERROR: " + errno);
}
@@ -285,7 +285,7 @@
}
// Now it's not a try again case, but a real probblez
- perror("TSocket::read()");
+ GlobalOutput("TSocket::read()");
// If we disconnect with no linger time
if (errno == ECONNRESET) {
@@ -354,7 +354,7 @@
throw TTransportException(TTransportException::NOT_OPEN, "ENOTCONN");
}
- perror("TSocket::write() send < 0");
+ GlobalOutput("TSocket::write() send < 0");
throw TTransportException(TTransportException::UNKNOWN, "ERROR:" + errno);
}
@@ -384,7 +384,7 @@
struct linger l = {(lingerOn_ ? 1 : 0), lingerVal_};
int ret = setsockopt(socket_, SOL_SOCKET, SO_LINGER, &l, sizeof(l));
if (ret == -1) {
- perror("TSocket::setLinger()");
+ GlobalOutput("TSocket::setLinger()");
}
}
@@ -398,7 +398,7 @@
int v = noDelay_ ? 1 : 0;
int ret = setsockopt(socket_, IPPROTO_TCP, TCP_NODELAY, &v, sizeof(v));
if (ret == -1) {
- perror("TSocket::setNoDelay()");
+ GlobalOutput("TSocket::setNoDelay()");
}
}
@@ -418,7 +418,7 @@
struct timeval r = recvTimeval_;
int ret = setsockopt(socket_, SOL_SOCKET, SO_RCVTIMEO, &r, sizeof(r));
if (ret == -1) {
- perror("TSocket::setRecvTimeout()");
+ GlobalOutput("TSocket::setRecvTimeout()");
}
}
@@ -432,7 +432,7 @@
(int)((sendTimeout_%1000)*1000)};
int ret = setsockopt(socket_, SOL_SOCKET, SO_SNDTIMEO, &s, sizeof(s));
if (ret == -1) {
- perror("TSocket::setSendTimeout()");
+ GlobalOutput("TSocket::setSendTimeout()");
}
}
diff --git a/lib/cpp/src/transport/TSocketPool.cpp b/lib/cpp/src/transport/TSocketPool.cpp
index 1450d1c..1293e5b 100644
--- a/lib/cpp/src/transport/TSocketPool.cpp
+++ b/lib/cpp/src/transport/TSocketPool.cpp
@@ -28,7 +28,7 @@
alwaysTryLast_(true)
{
if (hosts.size() != ports.size()) {
- perror("TSocketPool::TSocketPool: hosts.size != ports.size");
+ GlobalOutput("TSocketPool::TSocketPool: hosts.size != ports.size");
throw TTransportException(TTransportException::BAD_ARGS);
}
@@ -94,7 +94,7 @@
}
}
- perror("TSocketPool::open: all connections failed");
+ GlobalOutput("TSocketPool::open: all connections failed");
throw TTransportException(TTransportException::NOT_OPEN);
}