THRIFT-928. cpp: Thrift Server Client Stats

Add the ability for Thrift servers to monitor client connections.  It is
activated by #including server/TClientInfo.h and creating 1) a
TClientInfoCallHandler passed to the processor with setEventHandler()
and 2) a TClientInforServerHandler passed to the server with

The result vector, showing active connections, provides client address
and the thrift call it is executing (or last executed), the time
connected, and the number of calls made since connection.

git-svn-id: 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/compiler/cpp/src/generate/ b/compiler/cpp/src/generate/
index c8d9dda..a0e11f8 100644
--- a/compiler/cpp/src/generate/
+++ b/compiler/cpp/src/generate/
@@ -2403,12 +2403,18 @@
   string finish_cob;
   string finish_cob_decl;
   string cob_arg;
+  string call_context = ", void* callContext";
+  string call_context_arg = ", callContext";
+  string call_context_decl = ", void*";
   string ret_type = "bool ";
   if (style == "Cob") {
     ifstyle = "CobSv";
     pstyle = "Async";
     finish_cob = "std::tr1::function<void(bool ok)> cob, ";
     finish_cob_decl = "std::tr1::function<void(bool ok)>, ";
+    call_context = "";  // TODO(edhall) remove when callContext is aded to TAsyncProcessor
+    call_context_arg = "";  // ditto
+    call_context_decl = ""; // ditto
     cob_arg = "cob, ";
     ret_type = "void ";
@@ -2452,7 +2458,7 @@
   f_header_ <<
     indent() << "boost::shared_ptr<" << service_name_ << ifstyle << "If> iface_;" << endl;
   f_header_ <<
-    indent() << "virtual " << ret_type << "process_fn(" << finish_cob << "::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, std::string& fname, int32_t seqid);" << endl;
+    indent() << "virtual " << ret_type << "process_fn(" << finish_cob << "::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, std::string& fname, int32_t seqid" << call_context << ");" << endl;
   // Process function declarations
@@ -2463,14 +2469,16 @@
     indent() << "std::map<std::string, void (" <<
     service_name_ << pstyle << class_suffix << "::*)(" << finish_cob_decl <<
     "int32_t, ::apache::thrift::protocol::TProtocol*, " <<
-    "::apache::thrift::protocol::TProtocol*)> processMap_;" << endl;
+    "::apache::thrift::protocol::TProtocol*" << call_context_decl <<
+    ")> processMap_;" << endl;
   for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) {
     indent(f_header_) <<
-      "void process_" << (*f_iter)->get_name() << "(" << finish_cob << "int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot);" << endl;
+      "void process_" << (*f_iter)->get_name() << "(" << finish_cob << "int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot" << call_context << ");" << endl;
     if (gen_templates_) {
       indent(f_header_) <<
         "void process_" << (*f_iter)->get_name() << "(" << finish_cob <<
-        "int32_t seqid, Protocol_* iprot, Protocol_* oprot);" << endl;
+        "int32_t seqid, Protocol_* iprot, Protocol_* oprot" <<
+        call_context << ");" << endl;
     if (style == "Cob") {
       // XXX Factor this out, even if it is a pain.
@@ -2539,7 +2547,7 @@
     declare_map <<
     indent() << "}" << endl <<
     endl <<
-    indent() << "virtual " << ret_type << "process(" << finish_cob << "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot, boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot);" << endl <<
+    indent() << "virtual " << ret_type << "process(" << finish_cob << "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot, boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot" << call_context << ");" << endl <<
     indent() << "virtual ~" << service_name_ << pstyle << class_suffix <<
     "() {}" << endl;
@@ -2553,8 +2561,8 @@
     ret_type << service_name_ << pstyle << class_suffix << template_suffix <<
     "::process(" << finish_cob <<
     "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot, " <<
-    "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot) {" <<
-    endl;
+    "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot" <<
+    call_context << ") {" << endl;
   out <<
@@ -2581,7 +2589,7 @@
     indent() << "}" << endl <<
     endl <<
     indent() << "return process_fn(" << (style == "Cob" ? "cob, " : "")
-             << "iprot, oprot, fname, seqid);" <<
+             << "iprot, oprot, fname, seqid" << call_context_arg << ");" <<
@@ -2595,7 +2603,7 @@
     "::process_fn(" << finish_cob <<
     "::apache::thrift::protocol::TProtocol* iprot, " <<
     "::apache::thrift::protocol::TProtocol* oprot, " <<
-    "std::string& fname, int32_t seqid) {" << endl;
+    "std::string& fname, int32_t seqid" << call_context << ") {" << endl;
   // HOT: member function pointer map
@@ -2603,7 +2611,7 @@
     indent() << typename_str << "std::map<std::string, void (" <<
     service_name_ << pstyle << class_suffix << "::*)(" << finish_cob_decl <<
     "int32_t, ::apache::thrift::protocol::TProtocol*, " <<
-    "::apache::thrift::protocol::TProtocol*)>::iterator pfn;" << endl <<
+    "::apache::thrift::protocol::TProtocol*" << call_context_decl << ")>::iterator pfn;" << endl <<
     indent() << "pfn = processMap_.find(fname);" << endl <<
     indent() << "if (pfn == processMap_.end()) {" << endl;
   if (extends.empty()) {
@@ -2623,11 +2631,11 @@
       indent() << "  return "
                << extends << "::process_fn("
                << (style == "Cob" ? "cob, " : "")
-               << "iprot, oprot, fname, seqid);" << endl;
+               << "iprot, oprot, fname, seqid" << call_context_arg << ");" << endl;
   out <<
     indent() << "}" << endl <<
-    indent() << "(this->*(pfn->second))(" << cob_arg << "seqid, iprot, oprot);" << endl;
+    indent() << "(this->*(pfn->second))(" << cob_arg << "seqid, iprot, oprot" << call_context_arg << ");" << endl;
   // TODO(dreiss): return pfn ret?
   if (style == "Cob") {
@@ -2746,7 +2754,7 @@
     out <<
       "void " << tservice->get_name() << "Processor" << class_suffix << "::" <<
       "process_" << tfunction->get_name() << "(int32_t seqid, " <<
-      prot_type << "* iprot, " << prot_type << "* oprot)" << endl;
+      prot_type << "* iprot, " << prot_type << "* oprot, void* callContext)" << endl;
     if (gen_templates_ && !specialized) {
@@ -2759,7 +2767,7 @@
         endl <<
         indent() << "if (_iprot && _oprot) {" << endl <<
         indent() << "  return process_" << tfunction->get_name() <<
-        "(seqid, _iprot, _oprot);" << endl <<
+        "(seqid, _iprot, _oprot, callContext);" << endl <<
         indent() << "}" << endl << endl;
@@ -2769,7 +2777,7 @@
     out <<
       indent() << "void* ctx = NULL;" << endl <<
       indent() << "if (eventHandler_.get() != NULL) {" << endl <<
-      indent() << "  ctx = eventHandler_->getContext(\"" << service_func_name << "\");" << endl <<
+      indent() << "  ctx = eventHandler_->getContext(\"" << service_func_name << "\", callContext);" << endl <<
       indent() << "}" << endl <<
       indent() << "::apache::thrift::TProcessorContextFreer freer(eventHandler_.get(), ctx, \"" << service_func_name << "\");" << endl << endl <<
       indent() << "if (eventHandler_.get() != NULL) {" << endl <<
@@ -2901,6 +2909,7 @@
   // Cob style.
   else {
     // Processor entry point.
+    // TODO(edhall) update for callContext when TEventServer is ready
     if (gen_templates_) {
       out <<
         indent() << "template <class Protocol_>" << endl;
@@ -2932,7 +2941,7 @@
       indent() << tservice->get_name() + "_" + tfunction->get_name() + "_args" << " args;" << endl <<
       indent() << "void* ctx = NULL;" << endl <<
       indent() << "if (eventHandler_.get() != NULL) {" << endl <<
-      indent() << "  ctx = eventHandler_->getContext(\"" << service_func_name << "\");" << endl <<
+      indent() << "  ctx = eventHandler_->getContext(\"" << service_func_name << "\", NULL);" << endl <<
       indent() << "}" << endl <<
       indent() << "::apache::thrift::TProcessorContextFreer freer(eventHandler_.get(), ctx, \"" << service_func_name << "\");" << endl << endl <<
       indent() << "try {" << endl;
@@ -3073,7 +3082,7 @@
       out <<
         endl <<
         indent() << "if (eventHandler_.get() != NULL) {" << endl <<
-        indent() << "  ctx = eventHandler_->getContext(\"" << service_func_name << "\");" << endl <<
+        indent() << "  ctx = eventHandler_->getContext(\"" << service_func_name << "\", NULL);" << endl <<
         indent() << "}" << endl <<
         indent() << "::apache::thrift::TProcessorContextFreer freer(eventHandler_.get(), ctx, \"" << service_func_name << "\");" << endl << endl <<
         indent() << "if (eventHandler_.get() != NULL) {" << endl <<
@@ -3142,7 +3151,7 @@
       out <<
         endl <<
         indent() << "if (eventHandler_.get() != NULL) {" << endl <<
-        indent() << "  ctx = eventHandler_->getContext(\"" << service_func_name << "\");" << endl <<
+        indent() << "  ctx = eventHandler_->getContext(\"" << service_func_name << "\", NULL);" << endl <<
         indent() << "}" << endl <<
         indent() << "::apache::thrift::TProcessorContextFreer freer(eventHandler_.get(), ctx, \"" << service_func_name << "\");" << endl << endl <<
         indent() << "if (eventHandler_.get() != NULL) {" << endl <<
diff --git a/contrib/fb303/TClientInfo.cpp b/contrib/fb303/TClientInfo.cpp
new file mode 100644
index 0000000..d093017
--- /dev/null
+++ b/contrib/fb303/TClientInfo.cpp
@@ -0,0 +1,179 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <server/TClientInfo.h>
+namespace apache { namespace thrift { namespace server {
+using namespace apache::thrift;
+using namespace apache::thrift::transport;
+TClientInfoConnection::TClientInfoConnection() {
+  call_[kNameLen - 1] = '\0';    // insure NUL terminator is there
+  eraseAddr();
+  eraseCall();
+void TClientInfoConnection::recordAddr(const sockaddr* addr) {
+  eraseAddr();
+  initTime();
+  ncalls_ = 0;
+  if (addr != NULL) {
+    if (addr->sa_family == AF_INET) {
+      memcpy((void*)&addr_.ipv4, (const void *)addr, sizeof(sockaddr_in));
+    }
+    else if (addr->sa_family == AF_INET6) {
+      memcpy((void*)&addr_.ipv6, (const void *)addr, sizeof(sockaddr_in6));
+    }
+  }
+void TClientInfoConnection::eraseAddr() {
+  addr_.ipv4.sin_family = AF_UNSPEC;
+const char* TClientInfoConnection::getAddr(char* buf, int len) const {
+  switch (addr_.ipv4.sin_family) {
+  case AF_INET:
+    return inet_ntop(AF_INET, &addr_.ipv4.sin_addr, buf, len);
+  case AF_INET6:
+    return inet_ntop(AF_INET6, &addr_.ipv6.sin6_addr, buf, len);
+  default:
+    return NULL;
+  }
+void TClientInfoConnection::recordCall(const char* name) {
+  strncpy(call_, name, kNameLen - 1);   // NUL terminator set in constructor
+  ncalls_++;
+void TClientInfoConnection::eraseCall() {
+  call_[0] = '\0';
+const char* TClientInfoConnection::getCall() const {
+  if (call_[0] == '\0') {
+      return NULL;
+  }
+  return call_;
+void TClientInfoConnection::getTime(timespec* time) const {
+  *time = time_;
+uint64_t TClientInfoConnection::getNCalls() const {
+  return ncalls_;
+void TClientInfoConnection::initTime() {
+  clock_gettime(CLOCK_REALTIME, &time_);
+TClientInfoConnection* TClientInfo::getConnection(int fd, bool grow) {
+  if (fd < 0 || (!grow && fd >= info_.size())) {
+    return NULL;
+  }
+  return &info_[fd];
+size_t TClientInfo::size() const {
+    return info_.size();
+void* TClientInfoServerHandler::createContext(boost::shared_ptr<TProtocol> input,
+                                              boost::shared_ptr<TProtocol> output) {
+  (void)input;
+  (void)output;
+  return (void*) new Connect(&clientInfo_);
+void TClientInfoServerHandler::deleteContext(void* connectionContext,
+                                             boost::shared_ptr<TProtocol> input,
+                                             boost::shared_ptr<TProtocol> output) {
+  Connect* call = static_cast<Connect*>(connectionContext);
+  if (call->callInfo_) {
+    call->callInfo_->eraseCall();
+  }
+  delete call;
+void TClientInfoServerHandler::processContext(void* connectionContext,
+                                              shared_ptr<TTransport> transport) {
+  Connect* call = static_cast<Connect*>(connectionContext);
+  if (call->callInfo_ == NULL) {
+    if (typeid(*(transport.get())) == typeid(TSocket)) {
+      TSocket* tsocket = static_cast<TSocket*>(transport.get());
+      int fd = tsocket->getSocketFD();
+      if (fd < 0) {
+        return;
+      }
+      call->callInfo_ = call->clientInfo_->getConnection(fd, true);
+      assert(call->callInfo_ != NULL);
+      socklen_t len;
+        call->callInfo_->recordAddr(tsocket->getCachedAddress(&len));
+    }
+  }
+void TClientInfoServerHandler::getStatsStrings(vector<string>& result) {
+  result.clear();
+  timespec now;
+  clock_gettime(CLOCK_REALTIME, &now);
+  for (int i = 0; i < clientInfo_.size(); ++i) {
+    TClientInfoConnection* info = clientInfo_.getConnection(i, false);
+    const char* callStr = info->getCall();
+    if (callStr == NULL) {
+      continue;
+    }
+    char addrBuf[INET6_ADDRSTRLEN];
+    const char* addrStr = info->getAddr(addrBuf, sizeof addrBuf);
+    if (addrStr == NULL) {
+      // cerr << "no addr!" << endl;
+      continue;
+    }
+    timespec start;
+    double secs = 0.0;
+    info->getTime(&start);
+    secs = (double)(now.tv_sec - start.tv_sec) + (now.tv_nsec - start.tv_nsec)*0.000000001;
+    char buf[256];
+    snprintf(buf, sizeof buf, "%d %s %s %.3f %llu", i, addrStr, callStr, secs,
+             (unsigned long long)info->getNCalls());
+    result.push_back(buf);
+  }
+void* TClientInfoCallHandler::getContext(const char* fn_name, void* serverContext) {
+  if (serverContext) {
+    TClientInfoConnection* callInfo =  static_cast<TClientInfoServerHandler::Connect*>(serverContext)->callInfo_;
+    if (callInfo != NULL) {
+      callInfo->recordCall(fn_name);
+    }
+  }
+  return NULL;
+} } } // namespace apache::thrift::server
diff --git a/contrib/fb303/TClientInfo.h b/contrib/fb303/TClientInfo.h
new file mode 100644
index 0000000..9b2d284
--- /dev/null
+++ b/contrib/fb303/TClientInfo.h
@@ -0,0 +1,320 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+// for inet_ntop --
+#include <arpa/inet.h>
+#include <server/TServer.h>
+#include <transport/TSocket.h>
+#include <concurrency/Mutex.h>
+namespace apache { namespace thrift { namespace server {
+using namespace apache::thrift;
+using namespace apache::thrift::transport;
+using namespace apache::thrift::concurrency;
+using boost::shared_ptr;
+using std::string;
+using std::vector;
+ * StableVector -- a minimal vector class where growth is automatic and
+ * vector elements never move as the vector grows.  Allocates new space
+ * as needed, but does not copy old values.
+ *
+ * A level vector stores a list of storage vectors containing the actual
+ * elements.  Levels are added as needed, doubling in size each time.
+ * Locking is only done when a level is added.  Access is amortized
+ * constant time.
+ */
+template <typename T>
+class StableVector {
+  /// The initial allocation as an exponent of 2
+  static const uint32_t kInitialSizePowOf2 = 10;
+  /// The initial allocation size
+  static const uint32_t kInitialVectorSize = 1 << kInitialSizePowOf2;
+  /// This bound is guaranteed not to be exceeded on 64-bit archs
+  static const int kMaxLevels = 64;
+  /// Values are kept in one or more of these
+  typedef vector<T> Vect;
+  /// One or more value vectors are kept in one of these
+  typedef vector<Vect*> LevelVector;
+  Mutex mutex_;
+  /// current size
+  size_t size_;
+  _Atomic_word vectLvl_;
+  LevelVector vects_;
+ public:
+  /**
+   * Constructor -- initialize the level vector and allocate the
+   * initial storage vector
+   */
+  StableVector()
+    : size_(0) 
+    , vectLvl_(0) {
+    vects_.reserve(kMaxLevels);
+    Vect* storageVector(new Vect(1 << kInitialSizePowOf2));
+    vects_.push_back(storageVector);
+  }
+ private:
+  /**
+   * make sure the requested number of storage levels have been allocated.
+   */
+  void expand(uint32_t level) {
+    // we need the guard to insure that we only allocate once.
+    Guard g(mutex_);
+    while (level > vectLvl_) {
+      Vect* levelVect(new Vect(1 << (vectLvl_ + kInitialSizePowOf2)));
+      vects_.push_back(levelVect);
+      // we need to make sure this is done after levelVect is inserted
+      // (what we want is effectively a memory barrier here).
+      __gnu_cxx::__atomic_add(&vectLvl_, 1);
+    }
+  }
+  /**
+   * Given an index, determine which level and element of that level is
+   * required.  Grows if needed.
+   */
+  void which(uint32_t n, uint32_t* vno, uint32_t* idx) {
+    if (n >= size_) {
+      size_ = n + 1;
+    }
+    if (n < kInitialVectorSize) {
+      *idx = n;
+      *vno = 0;
+    } else {
+      uint32_t upper = n >> kInitialSizePowOf2;
+      *vno = CHAR_BIT*sizeof(upper) - __builtin_clz(upper);
+      *idx = n - (1 << (*vno + kInitialSizePowOf2 - 1));
+      if (*vno > vectLvl_) {
+        expand(*vno);
+      }
+    }
+  }
+ public:
+  /**
+   * Given an index, return a reference to that element, perhaps after
+   * allocating additional space.
+   *
+   * @param n a positive integer
+   */
+  T& operator[](uint32_t n) {
+    uint32_t vno;
+    uint32_t idx;
+    which(n, &vno, &idx);
+    return (*vects_[vno])[idx];
+  }
+  /**
+   * Return the present size of the vector.
+   */
+  size_t size() const { return size_; }
+ * This class embodies the representation of a single connection during
+ * processing.  We'll keep one of these per file descriptor in TClientInfo.
+ */
+class TClientInfoConnection {
+ public:
+  const static int kNameLen = 32;
+ private:
+  typedef union IPAddrUnion {
+    sockaddr_in ipv4;
+    sockaddr_in6 ipv6;
+  };
+  char call_[kNameLen];            ///< The name of the thrift call
+  IPAddrUnion addr_;               ///< The client's IP address
+  timespec time_;                  ///< Time processing started
+  uint64_t ncalls_;                ///< # of calls processed
+ public:
+  /**
+   * Constructor; insure that no client address or thrift call name is
+   * represented.
+   */
+  TClientInfoConnection();
+  /**
+   * A connection has been made; record its address.  Since this is the
+   * first we'll know of a connection we start the timer here as well.
+   */
+  void recordAddr(const sockaddr* addr);
+  /**
+   * Mark the address as empty/unknown.
+   */
+  void eraseAddr();
+  /**
+   * Return a string representing the present address, or NULL if none.
+   * Copies the string into the buffer provided.
+   */
+  const char* getAddr(char* buf, int len) const;
+  /**
+   * A call has been made on this connection; record its name.  Since this is
+   * called for every thrift call processed, we also do our call count here.
+   */ 
+  void recordCall(const char* name);
+  /**
+   * Invoked when processing has ended to clear the call name.
+   */
+  void eraseCall();
+  /**
+   * Return as string the thrift call either currently being processed or
+   * most recently processed if the connection is still open for additonal
+   * calls.  Returns NULL if a call hasn't been made yet or processing
+   * has ended.
+   */
+  const char* getCall() const;
+  /**
+   * Get the timespec for the start of this connection (specifically, when
+   * recordAddr() was first called).
+   */
+  void getTime(timespec* time) const;
+  /**
+   * Return the number of calls made on this connection.
+   */
+  uint64_t getNCalls() const;
+ private:
+  void initTime();
+ * Store for info about a server's clients -- specifically, the client's IP
+ * address and the call it is executing.  This information is indexed by
+ * socket file descriptor and in the present implementation is updated
+ * asynchronously, so it may only approximate reality.
+ */
+class TClientInfo {
+ private:
+  StableVector<TClientInfoConnection> info_;
+ public:
+  /**
+   * Return the info object for a given file descriptor.  If "grow" is true
+   * extend the info vector if required (such as for a file descriptor not seen
+   * before).  If "grow" is false and the info vector isn't large enough,
+   * or if "fd" is negative, return NULL.
+   */
+  TClientInfoConnection* getConnection(int fd, bool grow);
+  size_t size() const;
+ * This derivation of TServerEventHandler encapsulates the main status vector
+ * and provides context to the server's processing loop via overrides.
+ * Together with TClientInfoCallHandler (derived from TProcessorEventHandler) 
+ * it integrates client info collection into the server.
+ */
+class TClientInfoServerHandler : public TServerEventHandler {
+ private:
+  TClientInfo clientInfo_;
+ public:
+  /**
+   * One of these is constructed for each open connection/descriptor and links
+   * to both the status vector (clientInfo_) and that descriptor's entry
+   * within it.
+   */
+  struct Connect {
+    TClientInfo* clientInfo_;
+    TClientInfoConnection* callInfo_;
+    explicit Connect(TClientInfo* clientInfo)
+      : clientInfo_(clientInfo)
+      , callInfo_(NULL) {
+    }
+  };
+  /**
+   * Generate processor context; we don't know what descriptor we belong to
+   * yet -- we'll get hooked up in contextProcess(). 
+   */
+  void* createContext(boost::shared_ptr<TProtocol> input,
+                      boost::shared_ptr<TProtocol> output);
+  /**
+   * Mark our slot as unused and delete the context created in createContext().
+   */
+  void deleteContext(void* processorContext,
+                     boost::shared_ptr<TProtocol> input,
+                     boost::shared_ptr<TProtocol> output);
+  /**
+   * Called in the processing loop just before the server invokes the
+   * processor itself, on the first call we establish which descriptor
+   * we correspond to and set it to that socket's peer IP address.  This
+   * also has the side effect of initializing call counting and connection
+   * timing.  We won't know which call we're handling until the handler
+   * first gets called in TClientInfoCallHandler::getContext().
+   */
+  void processContext(void* processorContext,
+                      shared_ptr<TTransport> transport);
+  /**
+   * Get status report for server in the form of a vector of strings.
+   * Each active client appears as one string in the format:
+   *
+   *
+   * where "FD" is the file descriptor for the client's socket, "IPADDR"
+   * is the IP address (as reported by accept()), "CALLNAME" is the
+   * current or most recent Thrift function name, "DURATION" is the
+   * duration of the connection, while NCALLS is the number of Thrift
+   * calls made since the connection was made.  A single space separates
+   * fields.
+   */
+  void getStatsStrings(vector<string>& result);
+ * This class derives from TProcessorEventHandler to gain access to the
+ * function name for the current Thrift call.  We need two versions of
+ * this -- TClientInfoCallStatsHandler is the other -- since in the latter
+ * case we pass through to TFunctionStatHandler to perform Thrift call
+ * stats.
+ */
+class TClientInfoCallHandler : public TProcessorEventHandler {
+ public:
+  virtual void* getContext(const char* fn_name, void* serverContext);
+} } } // namespace apache::thrift::server
diff --git a/lib/cpp/src/TProcessor.h b/lib/cpp/src/TProcessor.h
index 7858166..16b46df 100644
--- a/lib/cpp/src/TProcessor.h
+++ b/lib/cpp/src/TProcessor.h
@@ -44,7 +44,7 @@
    * The return value is passed to all other callbacks
    * for that function invocation.
-  virtual void* getContext(const char* fn_name) { return NULL; }
+  virtual void* getContext(const char* fn_name, void* serverContext) { return NULL; }
    * Expected to free resources associated with a context.
@@ -112,10 +112,12 @@
   virtual ~TProcessor() {}
   virtual bool process(boost::shared_ptr<protocol::TProtocol> in,
-                       boost::shared_ptr<protocol::TProtocol> out) = 0;
+                       boost::shared_ptr<protocol::TProtocol> out,
+                       void* connectionContext) = 0;
-  bool process(boost::shared_ptr<apache::thrift::protocol::TProtocol> io) {
-    return process(io, io);
+  bool process(boost::shared_ptr<apache::thrift::protocol::TProtocol> io,
+               void* connectionContext) {
+    return process(io, io, connectionContext);
   boost::shared_ptr<TProcessorEventHandler> getEventHandler() {
diff --git a/lib/cpp/src/processor/PeekProcessor.cpp b/lib/cpp/src/processor/PeekProcessor.cpp
index c721861..076975b 100644
--- a/lib/cpp/src/processor/PeekProcessor.cpp
+++ b/lib/cpp/src/processor/PeekProcessor.cpp
@@ -58,7 +58,8 @@
 bool PeekProcessor::process(boost::shared_ptr<TProtocol> in,
-                            boost::shared_ptr<TProtocol> out) {
+                            boost::shared_ptr<TProtocol> out,
+                            void* connectionContext) {
   std::string fname;
   TMessageType mtype;
@@ -100,7 +101,7 @@
   // Done peeking at variables
-  bool ret = actualProcessor_->process(pipedProtocol_, out);
+  bool ret = actualProcessor_->process(pipedProtocol_, out, connectionContext);
   return ret;
diff --git a/lib/cpp/src/processor/PeekProcessor.h b/lib/cpp/src/processor/PeekProcessor.h
index 0f7c016..cb703f6 100644
--- a/lib/cpp/src/processor/PeekProcessor.h
+++ b/lib/cpp/src/processor/PeekProcessor.h
@@ -53,7 +53,8 @@
   void setTargetTransport(boost::shared_ptr<apache::thrift::transport::TTransport> targetTransport);
   virtual bool process(boost::shared_ptr<apache::thrift::protocol::TProtocol> in,
-                       boost::shared_ptr<apache::thrift::protocol::TProtocol> out);
+                       boost::shared_ptr<apache::thrift::protocol::TProtocol> out,
+                       void* connectionContext);
   // The following three functions can be overloaded by child classes to
   // achieve desired peeking behavior
diff --git a/lib/cpp/src/processor/StatsProcessor.h b/lib/cpp/src/processor/StatsProcessor.h
index 820b3ad..8600c6b 100644
--- a/lib/cpp/src/processor/StatsProcessor.h
+++ b/lib/cpp/src/processor/StatsProcessor.h
@@ -39,7 +39,9 @@
   virtual ~StatsProcessor() {};
-  virtual bool process(boost::shared_ptr<apache::thrift::protocol::TProtocol> piprot, boost::shared_ptr<apache::thrift::protocol::TProtocol> poprot) {
+  virtual bool process(boost::shared_ptr<apache::thrift::protocol::TProtocol> piprot,
+                       boost::shared_ptr<apache::thrift::protocol::TProtocol> poprot,
+                       void* serverContext) {
     piprot_ = piprot;
diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp
index 85fe265..4245d5e 100644
--- a/lib/cpp/src/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/server/TNonblockingServer.cpp
@@ -56,7 +56,7 @@
   void run() {
     try {
-      while (processor_->process(input_, output_)) {
+      while (processor_->process(input_, output_, NULL)) {
         if (!input_->getTransport()->peek()) {
@@ -293,7 +293,7 @@
     } else {
       try {
         // Invoke the processor
-        server_->getProcessor()->process(inputProtocol_, outputProtocol_);
+        server_->getProcessor()->process(inputProtocol_, outputProtocol_, NULL);
       } catch (TTransportException &ttx) {
         GlobalOutput.printf("TTransportException: Server::process() %s", ttx.what());
diff --git a/lib/cpp/src/server/TServer.h b/lib/cpp/src/server/TServer.h
index 5c4c588..4dddfea 100644
--- a/lib/cpp/src/server/TServer.h
+++ b/lib/cpp/src/server/TServer.h
@@ -57,14 +57,33 @@
    * Called when a new client has connected and is about to being processing.
-  virtual void clientBegin(boost::shared_ptr<TProtocol> /* input */,
-                           boost::shared_ptr<TProtocol> /* output */) {}
+  virtual void* createContext(boost::shared_ptr<TProtocol> input,
+                              boost::shared_ptr<TProtocol> output) {
+    (void)input;
+    (void)output;
+    return NULL;
+  }
-   * Called when a client has finished making requests.
+   * Called when a client has finished request-handling to delete server
+   * context.
-  virtual void clientEnd(boost::shared_ptr<TProtocol> /* input */,
-                         boost::shared_ptr<TProtocol> /* output */) {}
+  virtual void deleteContext(void* serverContext,
+                             boost::shared_ptr<TProtocol>input,
+                             boost::shared_ptr<TProtocol>output) {
+    (void)serverContext;
+    (void)input;
+    (void)output;
+  }
+  /**
+   * Called when a client is about to call the processor.
+   */
+  virtual void processContext(void* serverContext,
+                              boost::shared_ptr<TTransport> transport) {
+    (void)serverContext;
+    (void)transport;
diff --git a/lib/cpp/src/server/TSimpleServer.cpp b/lib/cpp/src/server/TSimpleServer.cpp
index 394ce21..438a587 100644
--- a/lib/cpp/src/server/TSimpleServer.cpp
+++ b/lib/cpp/src/server/TSimpleServer.cpp
@@ -63,13 +63,18 @@
       outputTransport = outputTransportFactory_->getTransport(client);
       inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
       outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
+      void* connectionContext = NULL;
       if (eventHandler_ != NULL) {
-        eventHandler_->clientBegin(inputProtocol, outputProtocol);
+        connectionContext = eventHandler_->createContext(inputProtocol, outputProtocol);
       try {
-        while (processor_->process(inputProtocol, outputProtocol)) {
-          // Peek ahead, is the remote side closed?
-          if (!inputTransport->peek()) {
+        for (;;) {
+          if (eventHandler_ != NULL) {
+            eventHandler_->processContext(connectionContext, client);
+          }
+          if (!processor_->process(inputProtocol, outputProtocol, connectionContext) ||
+              // Peek ahead, is the remote side closed?
+              !inputProtocol->getTransport()->peek()) {
@@ -79,7 +84,7 @@
         cerr << "TSimpleServer exception: " << tx.what() << endl;
       if (eventHandler_ != NULL) {
-        eventHandler_->clientEnd(inputProtocol, outputProtocol);
+        eventHandler_->deleteContext(connectionContext, inputProtocol, outputProtocol);
diff --git a/lib/cpp/src/server/TThreadPoolServer.cpp b/lib/cpp/src/server/TThreadPoolServer.cpp
index 6eea3db..18319be 100644
--- a/lib/cpp/src/server/TThreadPoolServer.cpp
+++ b/lib/cpp/src/server/TThreadPoolServer.cpp
@@ -40,11 +40,13 @@
   Task(TThreadPoolServer &server,
        shared_ptr<TProcessor> processor,
        shared_ptr<TProtocol> input,
-       shared_ptr<TProtocol> output) :
+       shared_ptr<TProtocol> output,
+       shared_ptr<TTransport> transport) :
-    output_(output) {
+    output_(output),
+    transport_(transport) {
   ~Task() {}
@@ -52,12 +54,17 @@
   void run() {
     boost::shared_ptr<TServerEventHandler> eventHandler =
+    void* connectionContext = NULL;
     if (eventHandler != NULL) {
-      eventHandler->clientBegin(input_, output_);
+      connectionContext = eventHandler->createContext(input_, output_);
     try {
-      while (processor_->process(input_, output_)) {
-        if (!input_->getTransport()->peek()) {
+      for (;;) {
+        if (eventHandler != NULL) {
+          eventHandler->processContext(connectionContext, transport_);
+        }
+        if (!processor_->process(input_, output_, connectionContext) ||
+            !input_->getTransport()->peek()) {
@@ -78,7 +85,7 @@
     if (eventHandler != NULL) {
-      eventHandler->clientEnd(input_, output_);
+      eventHandler->deleteContext(connectionContext, input_, output_);
     try {
@@ -101,7 +108,7 @@
   shared_ptr<TProcessor> processor_;
   shared_ptr<TProtocol> input_;
   shared_ptr<TProtocol> output_;
+  shared_ptr<TTransport> transport_;
 TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
@@ -167,7 +174,7 @@
       outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
       // Add to threadmanager pool
-      threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(*this, processor_, inputProtocol, outputProtocol)), timeout_);
+      threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(*this, processor_, inputProtocol, outputProtocol, client)), timeout_);
     } catch (TTransportException& ttx) {
       if (inputTransport != NULL) { inputTransport->close(); }
diff --git a/lib/cpp/src/server/TThreadedServer.cpp b/lib/cpp/src/server/TThreadedServer.cpp
index cc30f8f..11718ca 100644
--- a/lib/cpp/src/server/TThreadedServer.cpp
+++ b/lib/cpp/src/server/TThreadedServer.cpp
@@ -42,11 +42,13 @@
   Task(TThreadedServer& server,
        shared_ptr<TProcessor> processor,
        shared_ptr<TProtocol> input,
-       shared_ptr<TProtocol> output) :
+       shared_ptr<TProtocol> output,
+       shared_ptr<TTransport> transport) :
-    output_(output) {
+    output_(output),
+    transport_(transport) {
   ~Task() {}
@@ -54,12 +56,17 @@
   void run() {
     boost::shared_ptr<TServerEventHandler> eventHandler =
+    void* connectionContext = NULL;
     if (eventHandler != NULL) {
-      eventHandler->clientBegin(input_, output_);
+      connectionContext = eventHandler->createContext(input_, output_);
     try {
-      while (processor_->process(input_, output_)) {
-        if (!input_->getTransport()->peek()) {
+      for (;;) {
+        if (eventHandler != NULL) {
+          eventHandler->processContext(connectionContext, transport_);
+        }
+        if (!processor_->process(input_, output_, connectionContext) ||
+            !input_->getTransport()->peek()) {
@@ -73,7 +80,7 @@
       GlobalOutput("TThreadedServer uncaught exception.");
     if (eventHandler != NULL) {
-      eventHandler->clientEnd(input_, output_);
+      eventHandler->deleteContext(connectionContext, input_, output_);
     try {
@@ -107,6 +114,7 @@
   shared_ptr<TProcessor> processor_;
   shared_ptr<TProtocol> input_;
   shared_ptr<TProtocol> output_;
+  shared_ptr<TTransport> transport_;
@@ -173,7 +181,8 @@
       TThreadedServer::Task* task = new TThreadedServer::Task(*this,
-                                                              outputProtocol);
+                                                              outputProtocol,
+                                                              client);
       // Create a task
       shared_ptr<Runnable> runnable =
diff --git a/lib/cpp/src/transport/TFileTransport.cpp b/lib/cpp/src/transport/TFileTransport.cpp
index 0b41694..40841ac 100644
--- a/lib/cpp/src/transport/TFileTransport.cpp
+++ b/lib/cpp/src/transport/TFileTransport.cpp
@@ -966,7 +966,7 @@
     // bad form to use exceptions for flow control but there is really
     // no other way around it
     try {
-      processor_->process(inputProtocol, outputProtocol);
+      processor_->process(inputProtocol, outputProtocol, NULL);
       if ( (numEvents > 0) && (numProcessed == numEvents)) {
@@ -998,7 +998,7 @@
     // bad form to use exceptions for flow control but there is really
     // no other way around it
     try {
-      processor_->process(inputProtocol, outputProtocol);
+      processor_->process(inputProtocol, outputProtocol, NULL);
       if (curChunk != inputTransport_->getCurChunk()) {
diff --git a/lib/cpp/src/transport/TServerSocket.cpp b/lib/cpp/src/transport/TServerSocket.cpp
index 836f6ba..90a27ce 100644
--- a/lib/cpp/src/transport/TServerSocket.cpp
+++ b/lib/cpp/src/transport/TServerSocket.cpp
@@ -393,7 +393,8 @@
   if (recvTimeout_ > 0) {
+  client->setCachedAddress((sockaddr*) &clientAddress, size);
   return client;
diff --git a/lib/cpp/src/transport/TSocket.cpp b/lib/cpp/src/transport/TSocket.cpp
index 951ddcf..ee76c3f 100644
--- a/lib/cpp/src/transport/TSocket.cpp
+++ b/lib/cpp/src/transport/TSocket.cpp
@@ -78,6 +78,7 @@
   maxRecvRetries_(5) {
   recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
   recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
+  cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC;
 TSocket::TSocket() :
@@ -94,6 +95,7 @@
   maxRecvRetries_(5) {
   recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
   recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
+  cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC;
 TSocket::TSocket(int socket) :
@@ -110,6 +112,7 @@
   maxRecvRetries_(5) {
   recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
   recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
+  cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC;
 TSocket::~TSocket() {
@@ -273,6 +276,8 @@
   // Set socket back to normal mode (blocking)
   fcntl(socket_, F_SETFL, flags);
+  setCachedAddress(res->ai_addr, res->ai_addrlen);
 void TSocket::open() {
@@ -600,22 +605,29 @@
 std::string TSocket::getPeerHost() {
   if (peerHost_.empty()) {
     struct sockaddr_storage addr;
-    socklen_t addrLen = sizeof(addr);
+    struct sockaddr* addrPtr;
+    socklen_t addrLen;
     if (socket_ < 0) {
       return host_;
-    int rv = getpeername(socket_, (sockaddr*) &addr, &addrLen);
+    addrPtr = getCachedAddress(&addrLen);
-    if (rv != 0) {
-      return peerHost_;
+    if (addrPtr == NULL) {
+      addrLen = sizeof(addr);
+      if (getpeername(socket_, (sockaddr*) &addr, &addrLen) != 0) {
+        return peerHost_;
+      }
+      addrPtr = (sockaddr*)&addr;
+      setCachedAddress(addrPtr, addrLen);
     char clienthost[NI_MAXHOST];
     char clientservice[NI_MAXSERV];
-    getnameinfo((sockaddr*) &addr, addrLen,
+    getnameinfo((sockaddr*) addrPtr, addrLen,
                 clienthost, sizeof(clienthost),
                 clientservice, sizeof(clientservice), 0);
@@ -627,22 +639,29 @@
 std::string TSocket::getPeerAddress() {
   if (peerAddress_.empty()) {
     struct sockaddr_storage addr;
-    socklen_t addrLen = sizeof(addr);
+    struct sockaddr* addrPtr;
+    socklen_t addrLen;
     if (socket_ < 0) {
       return peerAddress_;
-    int rv = getpeername(socket_, (sockaddr*) &addr, &addrLen);
+    addrPtr = getCachedAddress(&addrLen);
-    if (rv != 0) {
-      return peerAddress_;
+    if (addrPtr == NULL) {
+      addrLen = sizeof(addr);
+      if (getpeername(socket_, (sockaddr*) &addr, &addrLen) != 0) {
+        return peerAddress_;
+      }
+      addrPtr = (sockaddr*)&addr;
+      setCachedAddress(addrPtr, addrLen);
     char clienthost[NI_MAXHOST];
     char clientservice[NI_MAXSERV];
-    getnameinfo((sockaddr*) &addr, addrLen,
+    getnameinfo(addrPtr, addrLen,
                 clienthost, sizeof(clienthost),
                 clientservice, sizeof(clientservice),
@@ -658,6 +677,37 @@
   return peerPort_;
+void TSocket::setCachedAddress(const sockaddr* addr, socklen_t len) {
+  switch (addr->sa_family) {
+  case AF_INET:
+    if (len == sizeof(sockaddr_in)) {
+      memcpy((void*)&cachedPeerAddr_.ipv4, (void*)addr, len);
+    }
+    break;
+  case AF_INET6:
+    if (len == sizeof(sockaddr_in6)) {
+      memcpy((void*)&cachedPeerAddr_.ipv6, (void*)addr, len);
+    }
+    break;
+  }
+sockaddr* TSocket::getCachedAddress(socklen_t* len) const {
+  switch (cachedPeerAddr_.ipv4.sin_family) {
+  case AF_INET:
+    *len = sizeof(sockaddr_in);
+    return (sockaddr*) &cachedPeerAddr_.ipv4;
+  case AF_INET6:
+    *len = sizeof(sockaddr_in6);
+    return (sockaddr*) &cachedPeerAddr_.ipv6;
+  default:
+    return NULL;
+  }
 bool TSocket::useLowMinRto_ = false;
 void TSocket::setUseLowMinRto(bool useLowMinRto) {
   useLowMinRto_ = useLowMinRto;
diff --git a/lib/cpp/src/transport/TSocket.h b/lib/cpp/src/transport/TSocket.h
index f195438..47a702d 100644
--- a/lib/cpp/src/transport/TSocket.h
+++ b/lib/cpp/src/transport/TSocket.h
@@ -193,6 +193,18 @@
   int getPeerPort();
+   * Returns the underlying socket file descriptor.
+   */
+  int getSocketFD() {
+    return socket_;
+  }
+  /*
+   * Returns a cached copy of the peer address.
+   */
+  sockaddr* getCachedAddress(socklen_t* len) const;
+  /**
    * Sets whether to use a low minimum TCP retransmission timeout.
   static void setUseLowMinRto(bool useLowMinRto);
@@ -211,6 +223,12 @@
   /** connect, called by open */
   void openConnection(struct addrinfo *res);
+  /**
+   * Set a cache of the peer address (used when trivially available: e.g.
+   * accept() or connect()). Only caches IPV4 and IPV6; unset for others.
+   */
+  void setCachedAddress(const sockaddr* addr, socklen_t len);
   /** Host to connect to */
   std::string host_;
@@ -256,6 +274,15 @@
   /** Recv timeout timeval */
   struct timeval recvTimeval_;
+  /** Cached peer address */
+  union {
+    sockaddr_in ipv4;
+    sockaddr_in6 ipv6;
+  } cachedPeerAddr_;
+  /** Connection start time */
+  timespec startTime_;
   /** Whether to use low minimum TCP retransmission timeout */
   static bool useLowMinRto_;
diff --git a/test/cpp/src/TestServer.cpp b/test/cpp/src/TestServer.cpp
index 685957a..d30475b 100644
--- a/test/cpp/src/TestServer.cpp
+++ b/test/cpp/src/TestServer.cpp
@@ -289,7 +289,7 @@
 class TestProcessorEventHandler : public TProcessorEventHandler {
-  virtual void* getContext(const char* fn_name) {
+  virtual void* getContext(const char* fn_name, void* serverContext) {
     return new std::string(fn_name);
   virtual void freeContext(void* ctx, const char* fn_name) {