cpp: TNonblockingServer: Allow unrun tasks to expire after a time limit
Enhance ThreadManager to allow a expiration time interval to be assigned
to tasks, and expire those tasks after that time limit has passed.
Enhance TNonblockingServer to utilize this capability so it can be used
for overload resilience.
Note: expired entries are only removed from the queue head, so the
mechanism in ThreadManager may not do what you expect if you have
heterogeneous expiration times. That's not an issue with
TNonblockingServer (which will give all tasks the same limit) and might
not be in other cases where most tasks have the same limit and the rest
execute quickly. A full-up timeout queue would be more complex and have
greater overhead than that used here. It's unnecessary for the task at
hand so I didn't go that route...
The TNonblocking interface is simple: a setTaskExpireTime() accepts a
64-bit millisecond argument. 0 means infinite. A getTaskExpireTime()
accessor completes the interface.
The ThreadManager interface involves an added argument to add() for the
expiration interval and a setExpireCallback() function for setting a
callback that is called for expired tasks (for this project this is
necessary to shut down the associated connection).
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@920673 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp
index eb071a9..649994d 100644
--- a/lib/cpp/src/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/server/TNonblockingServer.cpp
@@ -770,6 +770,16 @@
   }
 }
 
+void TNonblockingServer::setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
+  threadManager_ = threadManager;
+  if (threadManager != NULL) {
+    threadManager->setExpireCallback(std::tr1::bind(&TNonblockingServer::expireClose, this, std::tr1::placeholders::_1));
+    threadPoolProcessing_ = true;
+  } else {
+    threadPoolProcessing_ = false;
+  }
+}
+
 bool  TNonblockingServer::serverOverloaded() {
   size_t activeConnections = numTConnections_ - connectionStack_.size();
   if (numActiveProcessors_ > maxActiveProcessors_ ||
@@ -807,6 +817,14 @@
   return false;
 }
 
+void TNonblockingServer::expireClose(boost::shared_ptr<Runnable> task) {
+  TConnection* connection =
+    static_cast<TConnection::Task*>(task.get())->getTConnection();
+  assert(connection && connection->getServer()
+	 && connection->getState() == APP_WAIT_TASK);
+  connection->forceClose();
+}
+
 /**
  * Main workhorse function, starts up the server listening on a port and
  * loops over the libevent handler.
diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h
index cf5aa18..41a2bf5 100644
--- a/lib/cpp/src/server/TNonblockingServer.h
+++ b/lib/cpp/src/server/TNonblockingServer.h
@@ -113,6 +113,9 @@
   /// Limit for number of open connections
   size_t maxConnections_;
 
+  /// Time in milliseconds before an unperformed task expires (0 == infinite).
+  int64_t taskExpireTime_;
+
   /**
    * Hysteresis for overload state.  This is the fraction of the overload
    * value that needs to be reached before the overload state is cleared;
@@ -173,6 +176,7 @@
     connectionStackLimit_(CONNECTION_STACK_LIMIT),
     maxActiveProcessors_(MAX_ACTIVE_PROCESSORS),
     maxConnections_(MAX_CONNECTIONS),
+    taskExpireTime_(0),
     overloadHysteresis_(0.8),
     overloadAction_(T_OVERLOAD_NO_ACTION),
     idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT),
@@ -194,6 +198,7 @@
     connectionStackLimit_(CONNECTION_STACK_LIMIT),
     maxActiveProcessors_(MAX_ACTIVE_PROCESSORS),
     maxConnections_(MAX_CONNECTIONS),
+    taskExpireTime_(0),
     overloadHysteresis_(0.8),
     overloadAction_(T_OVERLOAD_NO_ACTION),
     idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT),
@@ -224,12 +229,13 @@
     connectionStackLimit_(CONNECTION_STACK_LIMIT),
     maxActiveProcessors_(MAX_ACTIVE_PROCESSORS),
     maxConnections_(MAX_CONNECTIONS),
+    taskExpireTime_(0),
     overloadHysteresis_(0.8),
     overloadAction_(T_OVERLOAD_NO_ACTION),
     idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT),
     overloaded_(false),
     nConnectionsDropped_(0),
-    nTotalConnectionsDropped_(0)  {
+    nTotalConnectionsDropped_(0) {
     setInputTransportFactory(inputTransportFactory);
     setOutputTransportFactory(outputTransportFactory);
     setInputProtocolFactory(inputProtocolFactory);
@@ -239,10 +245,7 @@
 
   ~TNonblockingServer() {}
 
-  void setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
-    threadManager_ = threadManager;
-    threadPoolProcessing_ = (threadManager != NULL);
-  }
+  void setThreadManager(boost::shared_ptr<ThreadManager> threadManager);
 
   boost::shared_ptr<ThreadManager> getThreadManager() {
     return threadManager_;
@@ -271,7 +274,7 @@
   }
 
   void addTask(boost::shared_ptr<Runnable> task) {
-    threadManager_->add(task);
+    threadManager_->add(task, 0LL, taskExpireTime_);
   }
 
   event_base* getEventBase() const {
@@ -406,6 +409,24 @@
   }
 
   /**
+   * Get the time in milliseconds after which a task expires (0 == infinite).
+   *
+   * @return a 64-bit time in milliseconds.
+   */
+  int64_t getTaskExpireTime() const {
+    return taskExpireTime_;
+  }
+
+  /**
+   * Set the time in milliseconds after which a task expires (0 == infinite).
+   *
+   * @param taskExpireTime a 64-bit time in milliseconds.
+   */
+  void setTaskExpireTime(int64_t taskExpireTime) {
+    taskExpireTime_ = taskExpireTime;
+  }
+
+  /**
    * Determine if the server is currently overloaded.
    * This function checks the maximums for open connections and connections
    * currently in processing, and sets an overload condition if they are
@@ -463,6 +484,14 @@
   void returnConnection(TConnection* connection);
 
   /**
+   * Callback function that the threadmanager calls when a task reaches
+   * its expiration time.  It is needed to clean up the expired connection.
+   *
+   * @param task the runnable associated with the expired task.
+   */
+  void expireClose(boost::shared_ptr<Runnable> task);
+
+  /**
    * C-callable event handler for listener events.  Provides a callback
    * that libevent can understand which invokes server->handleEvent().
    *