THRIFT-928. cpp: Thrift Server Client Stats
Add the ability for Thrift servers to monitor client connections. It is
activated by #including server/TClientInfo.h and creating 1) a
TClientInfoCallHandler passed to the processor with setEventHandler()
and 2) a TClientInforServerHandler passed to the server with
setServerEventHandler().
The result vector, showing active connections, provides client address
and the thrift call it is executing (or last executed), the time
connected, and the number of calls made since connection.
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@1005139 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/transport/TFileTransport.cpp b/lib/cpp/src/transport/TFileTransport.cpp
index 0b41694..40841ac 100644
--- a/lib/cpp/src/transport/TFileTransport.cpp
+++ b/lib/cpp/src/transport/TFileTransport.cpp
@@ -966,7 +966,7 @@
// bad form to use exceptions for flow control but there is really
// no other way around it
try {
- processor_->process(inputProtocol, outputProtocol);
+ processor_->process(inputProtocol, outputProtocol, NULL);
numProcessed++;
if ( (numEvents > 0) && (numProcessed == numEvents)) {
return;
@@ -998,7 +998,7 @@
// bad form to use exceptions for flow control but there is really
// no other way around it
try {
- processor_->process(inputProtocol, outputProtocol);
+ processor_->process(inputProtocol, outputProtocol, NULL);
if (curChunk != inputTransport_->getCurChunk()) {
break;
}
diff --git a/lib/cpp/src/transport/TServerSocket.cpp b/lib/cpp/src/transport/TServerSocket.cpp
index 836f6ba..90a27ce 100644
--- a/lib/cpp/src/transport/TServerSocket.cpp
+++ b/lib/cpp/src/transport/TServerSocket.cpp
@@ -393,7 +393,8 @@
if (recvTimeout_ > 0) {
client->setRecvTimeout(recvTimeout_);
}
-
+ client->setCachedAddress((sockaddr*) &clientAddress, size);
+
return client;
}
diff --git a/lib/cpp/src/transport/TSocket.cpp b/lib/cpp/src/transport/TSocket.cpp
index 951ddcf..ee76c3f 100644
--- a/lib/cpp/src/transport/TSocket.cpp
+++ b/lib/cpp/src/transport/TSocket.cpp
@@ -78,6 +78,7 @@
maxRecvRetries_(5) {
recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
+ cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC;
}
TSocket::TSocket() :
@@ -94,6 +95,7 @@
maxRecvRetries_(5) {
recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
+ cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC;
}
TSocket::TSocket(int socket) :
@@ -110,6 +112,7 @@
maxRecvRetries_(5) {
recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
+ cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC;
}
TSocket::~TSocket() {
@@ -273,6 +276,8 @@
done:
// Set socket back to normal mode (blocking)
fcntl(socket_, F_SETFL, flags);
+
+ setCachedAddress(res->ai_addr, res->ai_addrlen);
}
void TSocket::open() {
@@ -600,22 +605,29 @@
std::string TSocket::getPeerHost() {
if (peerHost_.empty()) {
struct sockaddr_storage addr;
- socklen_t addrLen = sizeof(addr);
+ struct sockaddr* addrPtr;
+ socklen_t addrLen;
if (socket_ < 0) {
return host_;
}
- int rv = getpeername(socket_, (sockaddr*) &addr, &addrLen);
+ addrPtr = getCachedAddress(&addrLen);
- if (rv != 0) {
- return peerHost_;
+ if (addrPtr == NULL) {
+ addrLen = sizeof(addr);
+ if (getpeername(socket_, (sockaddr*) &addr, &addrLen) != 0) {
+ return peerHost_;
+ }
+ addrPtr = (sockaddr*)&addr;
+
+ setCachedAddress(addrPtr, addrLen);
}
char clienthost[NI_MAXHOST];
char clientservice[NI_MAXSERV];
- getnameinfo((sockaddr*) &addr, addrLen,
+ getnameinfo((sockaddr*) addrPtr, addrLen,
clienthost, sizeof(clienthost),
clientservice, sizeof(clientservice), 0);
@@ -627,22 +639,29 @@
std::string TSocket::getPeerAddress() {
if (peerAddress_.empty()) {
struct sockaddr_storage addr;
- socklen_t addrLen = sizeof(addr);
+ struct sockaddr* addrPtr;
+ socklen_t addrLen;
if (socket_ < 0) {
return peerAddress_;
}
- int rv = getpeername(socket_, (sockaddr*) &addr, &addrLen);
+ addrPtr = getCachedAddress(&addrLen);
- if (rv != 0) {
- return peerAddress_;
+ if (addrPtr == NULL) {
+ addrLen = sizeof(addr);
+ if (getpeername(socket_, (sockaddr*) &addr, &addrLen) != 0) {
+ return peerAddress_;
+ }
+ addrPtr = (sockaddr*)&addr;
+
+ setCachedAddress(addrPtr, addrLen);
}
char clienthost[NI_MAXHOST];
char clientservice[NI_MAXSERV];
- getnameinfo((sockaddr*) &addr, addrLen,
+ getnameinfo(addrPtr, addrLen,
clienthost, sizeof(clienthost),
clientservice, sizeof(clientservice),
NI_NUMERICHOST|NI_NUMERICSERV);
@@ -658,6 +677,37 @@
return peerPort_;
}
+void TSocket::setCachedAddress(const sockaddr* addr, socklen_t len) {
+ switch (addr->sa_family) {
+ case AF_INET:
+ if (len == sizeof(sockaddr_in)) {
+ memcpy((void*)&cachedPeerAddr_.ipv4, (void*)addr, len);
+ }
+ break;
+
+ case AF_INET6:
+ if (len == sizeof(sockaddr_in6)) {
+ memcpy((void*)&cachedPeerAddr_.ipv6, (void*)addr, len);
+ }
+ break;
+ }
+}
+
+sockaddr* TSocket::getCachedAddress(socklen_t* len) const {
+ switch (cachedPeerAddr_.ipv4.sin_family) {
+ case AF_INET:
+ *len = sizeof(sockaddr_in);
+ return (sockaddr*) &cachedPeerAddr_.ipv4;
+
+ case AF_INET6:
+ *len = sizeof(sockaddr_in6);
+ return (sockaddr*) &cachedPeerAddr_.ipv6;
+
+ default:
+ return NULL;
+ }
+}
+
bool TSocket::useLowMinRto_ = false;
void TSocket::setUseLowMinRto(bool useLowMinRto) {
useLowMinRto_ = useLowMinRto;
diff --git a/lib/cpp/src/transport/TSocket.h b/lib/cpp/src/transport/TSocket.h
index f195438..47a702d 100644
--- a/lib/cpp/src/transport/TSocket.h
+++ b/lib/cpp/src/transport/TSocket.h
@@ -193,6 +193,18 @@
int getPeerPort();
/**
+ * Returns the underlying socket file descriptor.
+ */
+ int getSocketFD() {
+ return socket_;
+ }
+
+ /*
+ * Returns a cached copy of the peer address.
+ */
+ sockaddr* getCachedAddress(socklen_t* len) const;
+
+ /**
* Sets whether to use a low minimum TCP retransmission timeout.
*/
static void setUseLowMinRto(bool useLowMinRto);
@@ -211,6 +223,12 @@
/** connect, called by open */
void openConnection(struct addrinfo *res);
+ /**
+ * Set a cache of the peer address (used when trivially available: e.g.
+ * accept() or connect()). Only caches IPV4 and IPV6; unset for others.
+ */
+ void setCachedAddress(const sockaddr* addr, socklen_t len);
+
/** Host to connect to */
std::string host_;
@@ -256,6 +274,15 @@
/** Recv timeout timeval */
struct timeval recvTimeval_;
+ /** Cached peer address */
+ union {
+ sockaddr_in ipv4;
+ sockaddr_in6 ipv6;
+ } cachedPeerAddr_;
+
+ /** Connection start time */
+ timespec startTime_;
+
/** Whether to use low minimum TCP retransmission timeout */
static bool useLowMinRto_;