THRIFT-928. cpp: Thrift Server Client Stats

Add the ability for Thrift servers to monitor client connections.  It is
activated by #including server/TClientInfo.h and creating 1) a
TClientInfoCallHandler passed to the processor with setEventHandler()
and 2) a TClientInforServerHandler passed to the server with
setServerEventHandler().

The result vector, showing active connections, provides client address
and the thrift call it is executing (or last executed), the time
connected, and the number of calls made since connection.

git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@1005139 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/TProcessor.h b/lib/cpp/src/TProcessor.h
index 7858166..16b46df 100644
--- a/lib/cpp/src/TProcessor.h
+++ b/lib/cpp/src/TProcessor.h
@@ -44,7 +44,7 @@
    * The return value is passed to all other callbacks
    * for that function invocation.
    */
-  virtual void* getContext(const char* fn_name) { return NULL; }
+  virtual void* getContext(const char* fn_name, void* serverContext) { return NULL; }
 
   /**
    * Expected to free resources associated with a context.
@@ -112,10 +112,12 @@
   virtual ~TProcessor() {}
 
   virtual bool process(boost::shared_ptr<protocol::TProtocol> in,
-                       boost::shared_ptr<protocol::TProtocol> out) = 0;
+                       boost::shared_ptr<protocol::TProtocol> out,
+                       void* connectionContext) = 0;
 
-  bool process(boost::shared_ptr<apache::thrift::protocol::TProtocol> io) {
-    return process(io, io);
+  bool process(boost::shared_ptr<apache::thrift::protocol::TProtocol> io,
+               void* connectionContext) {
+    return process(io, io, connectionContext);
   }
 
   boost::shared_ptr<TProcessorEventHandler> getEventHandler() {
diff --git a/lib/cpp/src/processor/PeekProcessor.cpp b/lib/cpp/src/processor/PeekProcessor.cpp
index c721861..076975b 100644
--- a/lib/cpp/src/processor/PeekProcessor.cpp
+++ b/lib/cpp/src/processor/PeekProcessor.cpp
@@ -58,7 +58,8 @@
 }
 
 bool PeekProcessor::process(boost::shared_ptr<TProtocol> in,
-                            boost::shared_ptr<TProtocol> out) {
+                            boost::shared_ptr<TProtocol> out,
+                            void* connectionContext) {
 
   std::string fname;
   TMessageType mtype;
@@ -100,7 +101,7 @@
   // Done peeking at variables
   peekEnd();
 
-  bool ret = actualProcessor_->process(pipedProtocol_, out);
+  bool ret = actualProcessor_->process(pipedProtocol_, out, connectionContext);
   memoryBuffer_->resetBuffer();
   return ret;
 }
diff --git a/lib/cpp/src/processor/PeekProcessor.h b/lib/cpp/src/processor/PeekProcessor.h
index 0f7c016..cb703f6 100644
--- a/lib/cpp/src/processor/PeekProcessor.h
+++ b/lib/cpp/src/processor/PeekProcessor.h
@@ -53,7 +53,8 @@
   void setTargetTransport(boost::shared_ptr<apache::thrift::transport::TTransport> targetTransport);
 
   virtual bool process(boost::shared_ptr<apache::thrift::protocol::TProtocol> in,
-                       boost::shared_ptr<apache::thrift::protocol::TProtocol> out);
+                       boost::shared_ptr<apache::thrift::protocol::TProtocol> out,
+                       void* connectionContext);
 
   // The following three functions can be overloaded by child classes to
   // achieve desired peeking behavior
diff --git a/lib/cpp/src/processor/StatsProcessor.h b/lib/cpp/src/processor/StatsProcessor.h
index 820b3ad..8600c6b 100644
--- a/lib/cpp/src/processor/StatsProcessor.h
+++ b/lib/cpp/src/processor/StatsProcessor.h
@@ -39,7 +39,9 @@
   {}
   virtual ~StatsProcessor() {};
 
-  virtual bool process(boost::shared_ptr<apache::thrift::protocol::TProtocol> piprot, boost::shared_ptr<apache::thrift::protocol::TProtocol> poprot) {
+  virtual bool process(boost::shared_ptr<apache::thrift::protocol::TProtocol> piprot,
+                       boost::shared_ptr<apache::thrift::protocol::TProtocol> poprot,
+                       void* serverContext) {
 
     piprot_ = piprot;
 
diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp
index 85fe265..4245d5e 100644
--- a/lib/cpp/src/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/server/TNonblockingServer.cpp
@@ -56,7 +56,7 @@
 
   void run() {
     try {
-      while (processor_->process(input_, output_)) {
+      while (processor_->process(input_, output_, NULL)) {
         if (!input_->getTransport()->peek()) {
           break;
         }
@@ -293,7 +293,7 @@
     } else {
       try {
         // Invoke the processor
-        server_->getProcessor()->process(inputProtocol_, outputProtocol_);
+        server_->getProcessor()->process(inputProtocol_, outputProtocol_, NULL);
       } catch (TTransportException &ttx) {
         GlobalOutput.printf("TTransportException: Server::process() %s", ttx.what());
         server_->decrementActiveProcessors();
diff --git a/lib/cpp/src/server/TServer.h b/lib/cpp/src/server/TServer.h
index 5c4c588..4dddfea 100644
--- a/lib/cpp/src/server/TServer.h
+++ b/lib/cpp/src/server/TServer.h
@@ -57,14 +57,33 @@
   /**
    * Called when a new client has connected and is about to being processing.
    */
-  virtual void clientBegin(boost::shared_ptr<TProtocol> /* input */,
-                           boost::shared_ptr<TProtocol> /* output */) {}
+  virtual void* createContext(boost::shared_ptr<TProtocol> input,
+                              boost::shared_ptr<TProtocol> output) {
+    (void)input;
+    (void)output;
+    return NULL;
+  }
 
   /**
-   * Called when a client has finished making requests.
+   * Called when a client has finished request-handling to delete server
+   * context.
    */
-  virtual void clientEnd(boost::shared_ptr<TProtocol> /* input */,
-                         boost::shared_ptr<TProtocol> /* output */) {}
+  virtual void deleteContext(void* serverContext,
+                             boost::shared_ptr<TProtocol>input,
+                             boost::shared_ptr<TProtocol>output) {
+    (void)serverContext;
+    (void)input;
+    (void)output;
+  }
+
+  /**
+   * Called when a client is about to call the processor.
+   */
+  virtual void processContext(void* serverContext,
+                              boost::shared_ptr<TTransport> transport) {
+    (void)serverContext;
+    (void)transport;
+}
 
  protected:
 
diff --git a/lib/cpp/src/server/TSimpleServer.cpp b/lib/cpp/src/server/TSimpleServer.cpp
index 394ce21..438a587 100644
--- a/lib/cpp/src/server/TSimpleServer.cpp
+++ b/lib/cpp/src/server/TSimpleServer.cpp
@@ -63,13 +63,18 @@
       outputTransport = outputTransportFactory_->getTransport(client);
       inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
       outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
+      void* connectionContext = NULL;
       if (eventHandler_ != NULL) {
-        eventHandler_->clientBegin(inputProtocol, outputProtocol);
+        connectionContext = eventHandler_->createContext(inputProtocol, outputProtocol);
       }
       try {
-        while (processor_->process(inputProtocol, outputProtocol)) {
-          // Peek ahead, is the remote side closed?
-          if (!inputTransport->peek()) {
+        for (;;) {
+          if (eventHandler_ != NULL) {
+            eventHandler_->processContext(connectionContext, client);
+          }
+          if (!processor_->process(inputProtocol, outputProtocol, connectionContext) ||
+              // Peek ahead, is the remote side closed?
+              !inputProtocol->getTransport()->peek()) {
             break;
           }
         }
@@ -79,7 +84,7 @@
         cerr << "TSimpleServer exception: " << tx.what() << endl;
       }
       if (eventHandler_ != NULL) {
-        eventHandler_->clientEnd(inputProtocol, outputProtocol);
+        eventHandler_->deleteContext(connectionContext, inputProtocol, outputProtocol);
       }
       inputTransport->close();
       outputTransport->close();
diff --git a/lib/cpp/src/server/TThreadPoolServer.cpp b/lib/cpp/src/server/TThreadPoolServer.cpp
index 6eea3db..18319be 100644
--- a/lib/cpp/src/server/TThreadPoolServer.cpp
+++ b/lib/cpp/src/server/TThreadPoolServer.cpp
@@ -40,11 +40,13 @@
   Task(TThreadPoolServer &server,
        shared_ptr<TProcessor> processor,
        shared_ptr<TProtocol> input,
-       shared_ptr<TProtocol> output) :
+       shared_ptr<TProtocol> output,
+       shared_ptr<TTransport> transport) :
     server_(server),
     processor_(processor),
     input_(input),
-    output_(output) {
+    output_(output),
+    transport_(transport) {
   }
 
   ~Task() {}
@@ -52,12 +54,17 @@
   void run() {
     boost::shared_ptr<TServerEventHandler> eventHandler =
       server_.getEventHandler();
+    void* connectionContext = NULL;
     if (eventHandler != NULL) {
-      eventHandler->clientBegin(input_, output_);
+      connectionContext = eventHandler->createContext(input_, output_);
     }
     try {
-      while (processor_->process(input_, output_)) {
-        if (!input_->getTransport()->peek()) {
+      for (;;) {
+        if (eventHandler != NULL) {
+          eventHandler->processContext(connectionContext, transport_);
+        }
+        if (!processor_->process(input_, output_, connectionContext) ||
+            !input_->getTransport()->peek()) {
           break;
         }
       }
@@ -78,7 +85,7 @@
     }
 
     if (eventHandler != NULL) {
-      eventHandler->clientEnd(input_, output_);
+      eventHandler->deleteContext(connectionContext, input_, output_);
     }
 
     try {
@@ -101,7 +108,7 @@
   shared_ptr<TProcessor> processor_;
   shared_ptr<TProtocol> input_;
   shared_ptr<TProtocol> output_;
-
+  shared_ptr<TTransport> transport_;
 };
 
 TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
@@ -167,7 +174,7 @@
       outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
 
       // Add to threadmanager pool
-      threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(*this, processor_, inputProtocol, outputProtocol)), timeout_);
+      threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(*this, processor_, inputProtocol, outputProtocol, client)), timeout_);
 
     } catch (TTransportException& ttx) {
       if (inputTransport != NULL) { inputTransport->close(); }
diff --git a/lib/cpp/src/server/TThreadedServer.cpp b/lib/cpp/src/server/TThreadedServer.cpp
index cc30f8f..11718ca 100644
--- a/lib/cpp/src/server/TThreadedServer.cpp
+++ b/lib/cpp/src/server/TThreadedServer.cpp
@@ -42,11 +42,13 @@
   Task(TThreadedServer& server,
        shared_ptr<TProcessor> processor,
        shared_ptr<TProtocol> input,
-       shared_ptr<TProtocol> output) :
+       shared_ptr<TProtocol> output,
+       shared_ptr<TTransport> transport) :
     server_(server),
     processor_(processor),
     input_(input),
-    output_(output) {
+    output_(output),
+    transport_(transport) {
   }
 
   ~Task() {}
@@ -54,12 +56,17 @@
   void run() {
     boost::shared_ptr<TServerEventHandler> eventHandler =
       server_.getEventHandler();
+    void* connectionContext = NULL;
     if (eventHandler != NULL) {
-      eventHandler->clientBegin(input_, output_);
+      connectionContext = eventHandler->createContext(input_, output_);
     }
     try {
-      while (processor_->process(input_, output_)) {
-        if (!input_->getTransport()->peek()) {
+      for (;;) {
+        if (eventHandler != NULL) {
+          eventHandler->processContext(connectionContext, transport_);
+        }
+        if (!processor_->process(input_, output_, connectionContext) ||
+            !input_->getTransport()->peek()) {
           break;
         }
       }
@@ -73,7 +80,7 @@
       GlobalOutput("TThreadedServer uncaught exception.");
     }
     if (eventHandler != NULL) {
-      eventHandler->clientEnd(input_, output_);
+      eventHandler->deleteContext(connectionContext, input_, output_);
     }
 
     try {
@@ -107,6 +114,7 @@
   shared_ptr<TProcessor> processor_;
   shared_ptr<TProtocol> input_;
   shared_ptr<TProtocol> output_;
+  shared_ptr<TTransport> transport_;
 };
 
 
@@ -173,7 +181,8 @@
       TThreadedServer::Task* task = new TThreadedServer::Task(*this,
                                                               processor_,
                                                               inputProtocol,
-                                                              outputProtocol);
+                                                              outputProtocol,
+                                                              client);
 
       // Create a task
       shared_ptr<Runnable> runnable =
diff --git a/lib/cpp/src/transport/TFileTransport.cpp b/lib/cpp/src/transport/TFileTransport.cpp
index 0b41694..40841ac 100644
--- a/lib/cpp/src/transport/TFileTransport.cpp
+++ b/lib/cpp/src/transport/TFileTransport.cpp
@@ -966,7 +966,7 @@
     // bad form to use exceptions for flow control but there is really
     // no other way around it
     try {
-      processor_->process(inputProtocol, outputProtocol);
+      processor_->process(inputProtocol, outputProtocol, NULL);
       numProcessed++;
       if ( (numEvents > 0) && (numProcessed == numEvents)) {
         return;
@@ -998,7 +998,7 @@
     // bad form to use exceptions for flow control but there is really
     // no other way around it
     try {
-      processor_->process(inputProtocol, outputProtocol);
+      processor_->process(inputProtocol, outputProtocol, NULL);
       if (curChunk != inputTransport_->getCurChunk()) {
         break;
       }
diff --git a/lib/cpp/src/transport/TServerSocket.cpp b/lib/cpp/src/transport/TServerSocket.cpp
index 836f6ba..90a27ce 100644
--- a/lib/cpp/src/transport/TServerSocket.cpp
+++ b/lib/cpp/src/transport/TServerSocket.cpp
@@ -393,7 +393,8 @@
   if (recvTimeout_ > 0) {
     client->setRecvTimeout(recvTimeout_);
   }
-
+  client->setCachedAddress((sockaddr*) &clientAddress, size);
+  
   return client;
 }
 
diff --git a/lib/cpp/src/transport/TSocket.cpp b/lib/cpp/src/transport/TSocket.cpp
index 951ddcf..ee76c3f 100644
--- a/lib/cpp/src/transport/TSocket.cpp
+++ b/lib/cpp/src/transport/TSocket.cpp
@@ -78,6 +78,7 @@
   maxRecvRetries_(5) {
   recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
   recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
+  cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC;
 }
 
 TSocket::TSocket() :
@@ -94,6 +95,7 @@
   maxRecvRetries_(5) {
   recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
   recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
+  cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC;
 }
 
 TSocket::TSocket(int socket) :
@@ -110,6 +112,7 @@
   maxRecvRetries_(5) {
   recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
   recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
+  cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC;
 }
 
 TSocket::~TSocket() {
@@ -273,6 +276,8 @@
  done:
   // Set socket back to normal mode (blocking)
   fcntl(socket_, F_SETFL, flags);
+
+  setCachedAddress(res->ai_addr, res->ai_addrlen);
 }
 
 void TSocket::open() {
@@ -600,22 +605,29 @@
 std::string TSocket::getPeerHost() {
   if (peerHost_.empty()) {
     struct sockaddr_storage addr;
-    socklen_t addrLen = sizeof(addr);
+    struct sockaddr* addrPtr;
+    socklen_t addrLen;
 
     if (socket_ < 0) {
       return host_;
     }
 
-    int rv = getpeername(socket_, (sockaddr*) &addr, &addrLen);
+    addrPtr = getCachedAddress(&addrLen);
 
-    if (rv != 0) {
-      return peerHost_;
+    if (addrPtr == NULL) {
+      addrLen = sizeof(addr);
+      if (getpeername(socket_, (sockaddr*) &addr, &addrLen) != 0) {
+        return peerHost_;
+      }
+      addrPtr = (sockaddr*)&addr;
+
+      setCachedAddress(addrPtr, addrLen);
     }
 
     char clienthost[NI_MAXHOST];
     char clientservice[NI_MAXSERV];
 
-    getnameinfo((sockaddr*) &addr, addrLen,
+    getnameinfo((sockaddr*) addrPtr, addrLen,
                 clienthost, sizeof(clienthost),
                 clientservice, sizeof(clientservice), 0);
 
@@ -627,22 +639,29 @@
 std::string TSocket::getPeerAddress() {
   if (peerAddress_.empty()) {
     struct sockaddr_storage addr;
-    socklen_t addrLen = sizeof(addr);
+    struct sockaddr* addrPtr;
+    socklen_t addrLen;
 
     if (socket_ < 0) {
       return peerAddress_;
     }
 
-    int rv = getpeername(socket_, (sockaddr*) &addr, &addrLen);
+    addrPtr = getCachedAddress(&addrLen);
 
-    if (rv != 0) {
-      return peerAddress_;
+    if (addrPtr == NULL) {
+      addrLen = sizeof(addr);
+      if (getpeername(socket_, (sockaddr*) &addr, &addrLen) != 0) {
+        return peerAddress_;
+      }
+      addrPtr = (sockaddr*)&addr;
+
+      setCachedAddress(addrPtr, addrLen);
     }
 
     char clienthost[NI_MAXHOST];
     char clientservice[NI_MAXSERV];
 
-    getnameinfo((sockaddr*) &addr, addrLen,
+    getnameinfo(addrPtr, addrLen,
                 clienthost, sizeof(clienthost),
                 clientservice, sizeof(clientservice),
                 NI_NUMERICHOST|NI_NUMERICSERV);
@@ -658,6 +677,37 @@
   return peerPort_;
 }
 
+void TSocket::setCachedAddress(const sockaddr* addr, socklen_t len) {
+  switch (addr->sa_family) {
+  case AF_INET:
+    if (len == sizeof(sockaddr_in)) {
+      memcpy((void*)&cachedPeerAddr_.ipv4, (void*)addr, len);
+    }
+    break;
+
+  case AF_INET6:
+    if (len == sizeof(sockaddr_in6)) {
+      memcpy((void*)&cachedPeerAddr_.ipv6, (void*)addr, len);
+    }
+    break;
+  }
+}
+
+sockaddr* TSocket::getCachedAddress(socklen_t* len) const {
+  switch (cachedPeerAddr_.ipv4.sin_family) {
+  case AF_INET:
+    *len = sizeof(sockaddr_in);
+    return (sockaddr*) &cachedPeerAddr_.ipv4;
+
+  case AF_INET6:
+    *len = sizeof(sockaddr_in6);
+    return (sockaddr*) &cachedPeerAddr_.ipv6;
+
+  default:
+    return NULL;
+  }
+} 
+
 bool TSocket::useLowMinRto_ = false;
 void TSocket::setUseLowMinRto(bool useLowMinRto) {
   useLowMinRto_ = useLowMinRto;
diff --git a/lib/cpp/src/transport/TSocket.h b/lib/cpp/src/transport/TSocket.h
index f195438..47a702d 100644
--- a/lib/cpp/src/transport/TSocket.h
+++ b/lib/cpp/src/transport/TSocket.h
@@ -193,6 +193,18 @@
   int getPeerPort();
 
   /**
+   * Returns the underlying socket file descriptor.
+   */
+  int getSocketFD() {
+    return socket_;
+  }
+
+  /*
+   * Returns a cached copy of the peer address.
+   */
+  sockaddr* getCachedAddress(socklen_t* len) const;
+
+  /**
    * Sets whether to use a low minimum TCP retransmission timeout.
    */
   static void setUseLowMinRto(bool useLowMinRto);
@@ -211,6 +223,12 @@
   /** connect, called by open */
   void openConnection(struct addrinfo *res);
 
+  /**
+   * Set a cache of the peer address (used when trivially available: e.g.
+   * accept() or connect()). Only caches IPV4 and IPV6; unset for others.
+   */
+  void setCachedAddress(const sockaddr* addr, socklen_t len);
+
   /** Host to connect to */
   std::string host_;
 
@@ -256,6 +274,15 @@
   /** Recv timeout timeval */
   struct timeval recvTimeval_;
 
+  /** Cached peer address */
+  union {
+    sockaddr_in ipv4;
+    sockaddr_in6 ipv6;
+  } cachedPeerAddr_;
+
+  /** Connection start time */
+  timespec startTime_;
+
   /** Whether to use low minimum TCP retransmission timeout */
   static bool useLowMinRto_;