Modified facebook::thrift::concurrency::Monitor.wait:
Throw TimedOutException on wait timeout so caller can distinguish between timeout and event.
Modified facebook::thrift::concurrency::PthreadThread.start:
Throw SystemrResourceException on any pthread_* function call failure rather than asserting 0.
Added facebook::thrift::concurrency::Thread.id() and facebook::thrift::concurrency::ThreadFactory.currentThreadId():
Return thread-id of thread and current thread respectively. Needed for reentrancy tests in ThreadManager
Added facebook::thrift::concurrency::ThreadManager.pendingTaskCountMaxN
Modified facebook::thrift::concurrency::ThreadManager.add():
Now support a maximum pending task count and block if the current pending task count is max.
If timeout is specified for add, TimedOutException is thrown if pending task count doesn't decrease
in the timeout interval. If add() is called by a ThreadManager worker thread and the task cannot
be added, a TooManyPendingTasksException is thrown rather than blocking, since deadlocks can ensue
if worker threads block waiting for works threads to complete tasks.
Reviewed By: mcslee, aditya
Revert Plan: revertible
Test Plan: concurrency/test/ThreadManagerTests.h
run concurrency-test thread-manager
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665120 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/Exception.h b/lib/cpp/src/concurrency/Exception.h
index 7735b2c..735cd87 100644
--- a/lib/cpp/src/concurrency/Exception.h
+++ b/lib/cpp/src/concurrency/Exception.h
@@ -10,7 +10,7 @@
#include <exception>
#include <Thrift.h>
-namespace facebook { namespace thrift { namespace concurrency {
+namespace facebook { namespace thrift { namespace concurrency {
class NoSuchTaskException : public facebook::thrift::TException {};
@@ -22,6 +22,16 @@
class TimedOutException : public facebook::thrift::TException {};
+class TooManyPendingTasksException : public facebook::thrift::TException {};
+
+class SystemResourceException : public facebook::thrift::TException {
+public:
+ SystemResourceException() {}
+
+ SystemResourceException(const std::string& message) :
+ TException(message) {}
+};
+
}}} // facebook::thrift::concurrency
#endif // #ifndef _THRIFT_CONCURRENCY_EXCEPTION_H_
diff --git a/lib/cpp/src/concurrency/Monitor.cpp b/lib/cpp/src/concurrency/Monitor.cpp
index 2443a6e..0177a0c 100644
--- a/lib/cpp/src/concurrency/Monitor.cpp
+++ b/lib/cpp/src/concurrency/Monitor.cpp
@@ -4,8 +4,8 @@
// See accompanying file LICENSE or visit the Thrift site at:
// http://developers.facebook.com/thrift/
-#include "Monitor.h"
-#include "Exception.h"
+#include "Monitor.h"
+#include "Exception.h"
#include "Util.h"
#include <assert.h>
@@ -15,11 +15,11 @@
#include <pthread.h>
-namespace facebook { namespace thrift { namespace concurrency {
+namespace facebook { namespace thrift { namespace concurrency {
/**
* Monitor implementation using the POSIX pthread library
- *
+ *
* @author marc
* @version $Id:$
*/
@@ -30,16 +30,18 @@
Impl() :
mutexInitialized_(false),
condInitialized_(false) {
-
- try {
- int ret = pthread_mutex_init(&pthread_mutex_, NULL);
- assert(ret == 0);
+
+ if(pthread_mutex_init(&pthread_mutex_, NULL) == 0) {
mutexInitialized_ = true;
- ret = pthread_cond_init(&pthread_cond_, NULL);
- assert(ret == 0);
- condInitialized_ = true;
- } catch(...) {
+
+ if(pthread_cond_init(&pthread_cond_, NULL) == 0) {
+ condInitialized_ = true;
+ }
+ }
+
+ if(!mutexInitialized_ || !condInitialized_) {
cleanup();
+ throw SystemResourceException();
}
}
@@ -65,6 +67,7 @@
&abstime);
if (result == ETIMEDOUT) {
assert(Util::currentTime() >= (now + timeout));
+ throw TimedOutException();
}
}
}
diff --git a/lib/cpp/src/concurrency/Monitor.h b/lib/cpp/src/concurrency/Monitor.h
index 4c62e78..b6a9f71 100644
--- a/lib/cpp/src/concurrency/Monitor.h
+++ b/lib/cpp/src/concurrency/Monitor.h
@@ -7,7 +7,9 @@
#ifndef _THRIFT_CONCURRENCY_MONITOR_H_
#define _THRIFT_CONCURRENCY_MONITOR_H_ 1
-namespace facebook { namespace thrift { namespace concurrency {
+#include "Exception.h"
+
+namespace facebook { namespace thrift { namespace concurrency {
/**
* A monitor is a combination mutex and condition-event. Waiting and
@@ -50,7 +52,7 @@
class Synchronized {
public:
-
+
Synchronized(const Monitor& value) :
monitor_(value) {
monitor_.lock();
diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cpp b/lib/cpp/src/concurrency/PosixThreadFactory.cpp
index 675e95d..e433467 100644
--- a/lib/cpp/src/concurrency/PosixThreadFactory.cpp
+++ b/lib/cpp/src/concurrency/PosixThreadFactory.cpp
@@ -5,6 +5,7 @@
// http://developers.facebook.com/thrift/
#include "PosixThreadFactory.h"
+#include "Exception.h"
#include <assert.h>
#include <pthread.h>
@@ -18,7 +19,7 @@
using namespace boost;
/**
- * The POSIX thread class.
+ * The POSIX thread class.
*
* @author marc
* @version $Id:$
@@ -47,10 +48,10 @@
weak_ptr<PthreadThread> self_;
public:
-
- PthreadThread(int policy, int priority, int stackSize, shared_ptr<Runnable> runnable) :
+
+ PthreadThread(int policy, int priority, int stackSize, shared_ptr<Runnable> runnable) :
pthread_(0),
- state_(uninitialized),
+ state_(uninitialized),
policy_(policy),
priority_(priority),
stackSize_(stackSize) {
@@ -68,32 +69,39 @@
state_ = starting;
pthread_attr_t thread_attr;
- int ret = pthread_attr_init(&thread_attr);
- assert(ret == 0);
+ if(pthread_attr_init(&thread_attr) != 0) {
+ throw SystemResourceException("pthread_attr_init failed");
+ }
- ret = pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE);
- assert(ret == 0);
+ if(pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE) != 0) {
+ throw SystemResourceException("pthread_attr_setdetachstate failed");
+ }
// Set thread stack size
- ret = pthread_attr_setstacksize(&thread_attr, MB * stackSize_);
- assert(ret == 0);
+ if(pthread_attr_setstacksize(&thread_attr, MB * stackSize_) != 0) {
+ throw SystemResourceException("pthread_attr_setstacksize failed");
+ }
// Set thread policy
- ret = pthread_attr_setschedpolicy(&thread_attr, policy_);
- assert(ret == 0);
+ if(pthread_attr_setschedpolicy(&thread_attr, policy_) != 0) {
+ throw SystemResourceException("pthread_attr_setschedpolicy failed");
+ }
struct sched_param sched_param;
sched_param.sched_priority = priority_;
// Set thread priority
- ret = pthread_attr_setschedparam(&thread_attr, &sched_param);
- assert(ret == 0);
+ if(pthread_attr_setschedparam(&thread_attr, &sched_param) != 0) {
+ throw SystemResourceException("pthread_attr_setschedparam failed");
+ }
// Create reference
shared_ptr<PthreadThread>* selfRef = new shared_ptr<PthreadThread>();
*selfRef = self_.lock();
- ret = pthread_create(&pthread_, &thread_attr, threadMain, (void*)selfRef);
- assert(ret == 0);
+
+ if(pthread_create(&pthread_, &thread_attr, threadMain, (void*)selfRef) != 0) {
+ throw SystemResourceException("pthread_create failed");
+ }
}
void join() {
@@ -103,6 +111,10 @@
}
}
+ id_t id() {
+ return pthread_;
+ }
+
shared_ptr<Runnable> runnable() const { return Thread::runnable(); }
void runnable(shared_ptr<Runnable> value) { Thread::runnable(value); }
@@ -130,7 +142,7 @@
if (thread->state_ != stopping && thread->state_ != stopped) {
thread->state_ = stopping;
}
-
+
return (void*)0;
}
@@ -187,14 +199,14 @@
public:
- Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
+ Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
policy_(policy),
priority_(priority),
stackSize_(stackSize),
detached_(detached) {}
/**
- * Creates a new POSIX thread to run the runnable object
+ * Creates a new POSIX thread to run the runnable object
*
* @param runnable A runnable object
*/
@@ -211,6 +223,8 @@
PRIORITY priority() const { return priority_; }
+ Thread::id_t currentThreadId() const {return pthread_self();}
+
/**
* Sets priority.
*
@@ -220,7 +234,7 @@
void priority(PRIORITY value) { priority_ = value; }
};
-PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
+PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
impl_(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {}
shared_ptr<Thread> PosixThreadFactory::newThread(shared_ptr<Runnable> runnable) const { return impl_->newThread(runnable); }
@@ -233,4 +247,6 @@
void PosixThreadFactory::priority(PosixThreadFactory::PRIORITY value) { impl_->priority(value); }
+Thread::id_t PosixThreadFactory::currentThreadId() const {return impl_->currentThreadId();}
+
}}} // facebook::thrift::concurrency
diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.h b/lib/cpp/src/concurrency/PosixThreadFactory.h
index ede7d79..4e31dc5 100644
--- a/lib/cpp/src/concurrency/PosixThreadFactory.h
+++ b/lib/cpp/src/concurrency/PosixThreadFactory.h
@@ -57,6 +57,9 @@
// From ThreadFactory;
boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const;
+ // From ThreadFactory;
+ Thread::id_t currentThreadId() const;
+
/**
* Sets stack size for created threads
*
diff --git a/lib/cpp/src/concurrency/Thread.h b/lib/cpp/src/concurrency/Thread.h
index 96ca668..e928fc4 100644
--- a/lib/cpp/src/concurrency/Thread.h
+++ b/lib/cpp/src/concurrency/Thread.h
@@ -10,12 +10,12 @@
#include <boost/shared_ptr.hpp>
#include <boost/weak_ptr.hpp>
-namespace facebook { namespace thrift { namespace concurrency {
+namespace facebook { namespace thrift { namespace concurrency {
class Thread;
/**
- * Minimal runnable class. More or less analogous to java.lang.Runnable.
+ * Minimal runnable class. More or less analogous to java.lang.Runnable.
*
* @author marc
* @version $Id:$
@@ -43,7 +43,7 @@
};
/**
- * Minimal thread class. Returned by thread factory bound to a Runnable object
+ * Minimal thread class. Returned by thread factory bound to a Runnable object
* and ready to start execution. More or less analogous to java.lang.Thread
* (minus all the thread group, priority, mode and other baggage, since that
* is difficult to abstract across platforms and is left for platform-specific
@@ -52,8 +52,11 @@
* @see facebook::thrift::concurrency::ThreadFactory)
*/
class Thread {
-
+
public:
+
+ typedef unsigned long long id_t;
+
virtual ~Thread() {};
/**
@@ -70,6 +73,11 @@
virtual void join() = 0;
/**
+ * Gets the thread's platform-specific ID
+ */
+ virtual id_t id() = 0;
+
+ /**
* Gets the runnable object this thread is hosting
*/
virtual boost::shared_ptr<Runnable> runnable() const { return _runnable; }
@@ -79,6 +87,7 @@
private:
boost::shared_ptr<Runnable> _runnable;
+
};
/**
@@ -90,6 +99,12 @@
public:
virtual ~ThreadFactory() {}
virtual boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const = 0;
+
+ /** Gets the current thread id or unknown_thread_id if the current thread is not a thrift thread */
+
+ static const Thread::id_t unknown_thread_id;
+
+ virtual Thread::id_t currentThreadId() const = 0;
};
}}} // facebook::thrift::concurrency
diff --git a/lib/cpp/src/concurrency/ThreadManager.cpp b/lib/cpp/src/concurrency/ThreadManager.cpp
index 1631541..7d78edb 100644
--- a/lib/cpp/src/concurrency/ThreadManager.cpp
+++ b/lib/cpp/src/concurrency/ThreadManager.cpp
@@ -18,14 +18,13 @@
#include <iostream>
#endif //defined(DEBUG)
-namespace facebook { namespace thrift { namespace concurrency {
+namespace facebook { namespace thrift { namespace concurrency {
using namespace boost;
-
/**
* ThreadManager class
- *
+ *
* This class manages a pool of threads. It uses a ThreadFactory to create
* threads. It never actually creates or destroys worker threads, rather
* it maintains statistics on number of idle threads, number of active threads,
@@ -37,10 +36,11 @@
class ThreadManager::Impl : public ThreadManager {
public:
- Impl() :
+ Impl() :
workerCount_(0),
workerMaxCount_(0),
idleCount_(0),
+ pendingTaskCountMax_(0),
state_(ThreadManager::UNINITIALIZED) {}
~Impl() { stop(); }
@@ -56,39 +56,51 @@
}
shared_ptr<ThreadFactory> threadFactory() const {
- Synchronized s(monitor_);
+ Synchronized s(monitor_);
return threadFactory_;
}
-
- void threadFactory(shared_ptr<ThreadFactory> value) {
+
+ void threadFactory(shared_ptr<ThreadFactory> value) {
Synchronized s(monitor_);
threadFactory_ = value;
}
void addWorker(size_t value);
-
+
void removeWorker(size_t value);
-
+
size_t idleWorkerCount() const {
return idleCount_;
}
size_t workerCount() const {
- Synchronized s(monitor_);
+ Synchronized s(monitor_);
return workerCount_;
}
-
+
size_t pendingTaskCount() const {
- Synchronized s(monitor_);
+ Synchronized s(monitor_);
return tasks_.size();
}
size_t totalTaskCount() const {
- Synchronized s(monitor_);
+ Synchronized s(monitor_);
return tasks_.size() + workerCount_ - idleCount_;
}
-
- void add(shared_ptr<Runnable> value);
+
+ size_t pendingTaskCountMax() const {
+ Synchronized s(monitor_);
+ return pendingTaskCountMax_;
+ }
+
+ void pendingTaskCountMax(const size_t value) {
+ Synchronized s(monitor_);
+ pendingTaskCountMax_ = value;
+ }
+
+ bool canSleep();
+
+ void add(shared_ptr<Runnable> value, long long timeout);
void remove(shared_ptr<Runnable> task);
@@ -98,6 +110,8 @@
size_t workerCount_;
size_t workerMaxCount_;
size_t idleCount_;
+ size_t pendingTaskCountMax_;
+
ThreadManager::STATE state_;
shared_ptr<ThreadFactory> threadFactory_;
@@ -110,6 +124,7 @@
friend class ThreadManager::Worker;
std::set<shared_ptr<Thread> > workers_;
std::set<shared_ptr<Thread> > deadWorkers_;
+ std::map<const Thread::id_t, shared_ptr<Thread> > idMap_;
};
class ThreadManager::Task : public Runnable {
@@ -151,7 +166,7 @@
};
public:
- Worker(ThreadManager::Impl* manager) :
+ Worker(ThreadManager::Impl* manager) :
manager_(manager),
state_(UNINITIALIZED),
idle_(false) {}
@@ -173,7 +188,7 @@
* execute.
*/
void run() {
- bool active = false;
+ bool active = false;
bool notifyManager = false;
/**
@@ -230,6 +245,13 @@
if (task->state_ == ThreadManager::Task::WAITING) {
task->state_ = ThreadManager::Task::EXECUTING;
}
+
+ /* If we have a pending task max and we just dropped below it, wakeup any
+ thread that might be blocked on add. */
+ if(manager_->pendingTaskCountMax_ != 0 &&
+ manager_->tasks_.size() == manager_->pendingTaskCountMax_ - 1) {
+ manager_->workerMonitor_.notify();
+ }
}
} else {
idle_ = true;
@@ -237,7 +259,7 @@
notifyManager = (manager_->workerCount_ == manager_->workerMaxCount_);
}
}
-
+
if (task != NULL) {
if (task->state_ == ThreadManager::Task::EXECUTING) {
try {
@@ -248,18 +270,18 @@
}
}
}
-
+
{
- Synchronized s(manager_->workerMonitor_);
+ Synchronized s(manager_->workerMonitor_);
manager_->deadWorkers_.insert(this->thread());
if (notifyManager) {
manager_->workerMonitor_.notify();
}
}
-
+
return;
}
-
+
private:
ThreadManager::Impl* manager_;
friend class ThreadManager::Impl;
@@ -271,7 +293,7 @@
void ThreadManager::Impl::addWorker(size_t value) {
std::set<shared_ptr<Thread> > newThreads;
for (size_t ix = 0; ix < value; ix++) {
- class ThreadManager::Worker;
+ class ThreadManager::Worker;
shared_ptr<ThreadManager::Worker> worker = shared_ptr<ThreadManager::Worker>(new ThreadManager::Worker(this));
newThreads.insert(threadFactory_->newThread(worker));
}
@@ -281,15 +303,16 @@
workerMaxCount_ += value;
workers_.insert(newThreads.begin(), newThreads.end());
}
-
+
for (std::set<shared_ptr<Thread> >::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
shared_ptr<ThreadManager::Worker> worker = dynamic_pointer_cast<ThreadManager::Worker, Runnable>((*ix)->runnable());
worker->state_ = ThreadManager::Worker::STARTING;
(*ix)->start();
+ idMap_.insert(std::pair<const Thread::id_t, shared_ptr<Thread> >((*ix)->id(), *ix));
}
{
- Synchronized s(workerMonitor_);
+ Synchronized s(workerMonitor_);
while (workerCount_ != workerMaxCount_) {
workerMonitor_.wait();
}
@@ -303,7 +326,7 @@
}
{
- Synchronized s(monitor_);
+ Synchronized s(monitor_);
if (state_ == ThreadManager::UNINITIALIZED) {
if (threadFactory_ == NULL) {
throw InvalidArgumentException();
@@ -325,7 +348,7 @@
}
{
- Synchronized s(monitor_);
+ Synchronized s(monitor_);
if (state_ != ThreadManager::STOPPING &&
state_ != ThreadManager::JOINING &&
state_ != ThreadManager::STOPPED) {
@@ -338,12 +361,12 @@
removeWorker(workerCount_);
}
- // XXX
+ // XXX
// should be able to block here for transition to STOPPED since we're no
// using shared_ptrs
{
- Synchronized s(monitor_);
+ Synchronized s(monitor_);
state_ = ThreadManager::STOPPED;
}
@@ -352,7 +375,7 @@
void ThreadManager::Impl::removeWorker(size_t value) {
std::set<shared_ptr<Thread> > removedThreads;
{
- Synchronized s(monitor_);
+ Synchronized s(monitor_);
if (value > workerMaxCount_) {
throw InvalidArgumentException();
}
@@ -369,7 +392,7 @@
}
{
- Synchronized s(workerMonitor_);
+ Synchronized s(workerMonitor_);
while (workerCount_ != workerMaxCount_) {
workerMonitor_.wait();
@@ -377,19 +400,37 @@
for (std::set<shared_ptr<Thread> >::iterator ix = deadWorkers_.begin(); ix != deadWorkers_.end(); ix++) {
workers_.erase(*ix);
+ idMap_.erase((*ix)->id());
}
-
+
deadWorkers_.clear();
}
}
-
-void ThreadManager::Impl::add(shared_ptr<Runnable> value) {
- Synchronized s(monitor_);
+
+ bool ThreadManager::Impl::canSleep() {
+ const Thread::id_t id = threadFactory_->currentThreadId();
+ return idMap_.find(id) == idMap_.end();
+ }
+
+ void ThreadManager::Impl::add(shared_ptr<Runnable> value, long long timeout) {
+ Synchronized s(monitor_);
if (state_ != ThreadManager::STARTED) {
throw IllegalStateException();
}
+ if(pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
+
+ if(canSleep()) {
+
+ while(pendingTaskCountMax_ > 0 && tasks_.size() >= pendingTaskCountMax_) {
+ monitor_.wait(timeout);
+ }
+ } else {
+ throw TooManyPendingTasksException();
+ }
+ }
+
tasks_.push(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value)));
// If idle thread is available notify it, otherwise all worker threads are
@@ -400,7 +441,7 @@
}
void ThreadManager::Impl::remove(shared_ptr<Runnable> task) {
- Synchronized s(monitor_);
+ Synchronized s(monitor_);
if (state_ != ThreadManager::STARTED) {
throw IllegalStateException();
}
@@ -409,18 +450,21 @@
class SimpleThreadManager : public ThreadManager::Impl {
public:
- SimpleThreadManager(size_t workerCount=4) :
+ SimpleThreadManager(size_t workerCount=4, size_t pendingTaskCountMax=0) :
workerCount_(workerCount),
+ pendingTaskCountMax_(pendingTaskCountMax),
firstTime_(true) {
}
void start() {
+ ThreadManager::Impl::pendingTaskCountMax(pendingTaskCountMax_);
ThreadManager::Impl::start();
addWorker(workerCount_);
}
private:
const size_t workerCount_;
+ const size_t pendingTaskCountMax_;
bool firstTime_;
Monitor monitor_;
};
@@ -430,8 +474,9 @@
return shared_ptr<ThreadManager>(new ThreadManager::Impl());
}
-shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager(size_t count) {
- return shared_ptr<ThreadManager>(new SimpleThreadManager(count));
+shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager(size_t count, size_t pendingTaskCountMax) {
+ return shared_ptr<ThreadManager>(new SimpleThreadManager(count, pendingTaskCountMax));
}
}}} // facebook::thrift::concurrency
+
diff --git a/lib/cpp/src/concurrency/ThreadManager.h b/lib/cpp/src/concurrency/ThreadManager.h
index f0c745f..19f77cc 100644
--- a/lib/cpp/src/concurrency/ThreadManager.h
+++ b/lib/cpp/src/concurrency/ThreadManager.h
@@ -11,7 +11,7 @@
#include <sys/types.h>
#include "Thread.h"
-namespace facebook { namespace thrift { namespace concurrency {
+namespace facebook { namespace thrift { namespace concurrency {
/**
* Thread Pool Manager and related classes
@@ -56,7 +56,7 @@
* Stops the thread manager. Aborts all remaining unprocessed task, shuts
* down all created worker threads, and realeases all allocated resources.
* This method blocks for all worker threads to complete, thus it can
- * potentially block forever if a worker thread is running a task that
+ * potentially block forever if a worker thread is running a task that
* won't terminate.
*/
virtual void stop() = 0;
@@ -76,7 +76,7 @@
STOPPING,
STOPPED
};
-
+
virtual const STATE state() const = 0;
virtual boost::shared_ptr<ThreadFactory> threadFactory() const = 0;
@@ -108,11 +108,26 @@
virtual size_t totalTaskCount() const = 0;
/**
- * Adds a task to be execued at some time in the future by a worker thread.
- *
- * @param value The task to run
+ * Gets the maximum pending task count. 0 indicates no maximum
*/
- virtual void add(boost::shared_ptr<Runnable>value) = 0;
+ virtual size_t pendingTaskCountMax() const = 0;
+
+ /**
+ * Adds a task to be executed at some time in the future by a worker thread.
+ *
+ * This method will block if pendingTaskCountMax() in not zero and pendingTaskCount()
+ * is greater than or equalt to pendingTaskCountMax(). If this method is called in the
+ * context of a ThreadManager worker thread it will throw a
+ * TooManyPendingTasksException
+ *
+ * @param task The task to queue for execution
+ *
+ * @param timeout Time to wait in milliseconds to add a task when a pending-task-count
+ * is specified
+ *
+ * @throws TooManyPendingTasksException Pending task count exceeds max pending task count
+ */
+ virtual void add(boost::shared_ptr<Runnable>task, long long timeout=0LL) = 0;
/**
* Removes a pending task
@@ -122,12 +137,14 @@
static boost::shared_ptr<ThreadManager> newThreadManager();
/**
- * Creates a simple thread manager the uses count number of worker threads
+ * Creates a simple thread manager the uses count number of worker threads and has
+ * a pendingTaskCountMax maximum pending tasks. The default, 0, specified no limit
+ * on pending tasks
*/
- static boost::shared_ptr<ThreadManager> newSimpleThreadManager(size_t count=4);
+ static boost::shared_ptr<ThreadManager> newSimpleThreadManager(size_t count=4, size_t pendingTaskCountMax=0);
class Task;
-
+
class Worker;
class Impl;
diff --git a/lib/cpp/src/concurrency/TimerManager.cpp b/lib/cpp/src/concurrency/TimerManager.cpp
index 050885d..8d6dd5e 100644
--- a/lib/cpp/src/concurrency/TimerManager.cpp
+++ b/lib/cpp/src/concurrency/TimerManager.cpp
@@ -97,7 +97,9 @@
timeout = manager_->taskMap_.begin()->first - now;
}
assert((timeout != 0 && manager_->taskCount_ > 0) || (timeout == 0 && manager_->taskCount_ == 0));
- manager_->monitor_.wait(timeout);
+ try {
+ manager_->monitor_.wait(timeout);
+ } catch(TimedOutException& e) {}
now = Util::currentTime();
}
diff --git a/lib/cpp/src/concurrency/test/Tests.cpp b/lib/cpp/src/concurrency/test/Tests.cpp
index 96dd795..f4b0b62 100644
--- a/lib/cpp/src/concurrency/test/Tests.cpp
+++ b/lib/cpp/src/concurrency/test/Tests.cpp
@@ -29,9 +29,9 @@
if (runAll || args[0].compare("thread-factory") == 0) {
ThreadFactoryTests threadFactoryTests;
-
+
std::cout << "ThreadFactory tests..." << std::endl;
-
+
size_t count = 1000;
std::cout << "\t\tThreadFactory reap N threads test: N = " << count << std::endl;
@@ -61,7 +61,7 @@
time00 = Util::currentTime();
time01 = time00;
size_t count = 0;
-
+
while (time01 < time00 + 10) {
count++;
time01 = Util::currentTime();
@@ -99,6 +99,11 @@
ThreadManagerTests threadManagerTests;
assert(threadManagerTests.loadTest(taskCount, delay, workerCount));
+
+ std::cout << "\t\tThreadManager block test: worker count: " << workerCount << " delay: " << delay << std::endl;
+
+ assert(threadManagerTests.blockTest(delay, workerCount));
+
}
}
diff --git a/lib/cpp/src/concurrency/test/ThreadFactoryTests.h b/lib/cpp/src/concurrency/test/ThreadFactoryTests.h
index 90d0e4f..2e2dbdd 100644
--- a/lib/cpp/src/concurrency/test/ThreadFactoryTests.h
+++ b/lib/cpp/src/concurrency/test/ThreadFactoryTests.h
@@ -4,6 +4,7 @@
// See accompanying file LICENSE or visit the Thrift site at:
// http://developers.facebook.com/thrift/
+#include <config.h>
#include <concurrency/Thread.h>
#include <concurrency/PosixThreadFactory.h>
#include <concurrency/Monitor.h>
@@ -19,7 +20,7 @@
using namespace facebook::thrift::concurrency;
/**
- * ThreadManagerTests class
+ * ThreadManagerTests class
*
* @author marc
* @version $Id:$
@@ -29,7 +30,7 @@
public:
static const double ERROR;
-
+
class Task: public Runnable {
public:
@@ -65,20 +66,20 @@
* Reap N threads
*/
class ReapNTask: public Runnable {
-
+
public:
-
+
ReapNTask(Monitor& monitor, int& activeCount) :
_monitor(monitor),
_count(activeCount) {}
-
+
void run() {
Synchronized s(_monitor);
-
+
_count--;
-
+
//std::cout << "\t\t\tthread count: " << _count << std::endl;
-
+
if (_count == 0) {
_monitor.notify();
}
@@ -128,7 +129,7 @@
class SynchStartTask: public Runnable {
public:
-
+
enum STATE {
UNINITIALIZED,
STARTING,
@@ -171,9 +172,9 @@
bool synchStartTest() {
Monitor monitor;
-
+
SynchStartTask::STATE state = SynchStartTask::UNINITIALIZED;
-
+
shared_ptr<SynchStartTask> task = shared_ptr<SynchStartTask>(new SynchStartTask(monitor, state));
PosixThreadFactory threadFactory = PosixThreadFactory();
@@ -199,7 +200,10 @@
{
Synchronized s(monitor);
- monitor.wait(100);
+ try {
+ monitor.wait(100);
+ } catch(TimedOutException& e) {
+ }
if (state == SynchStartTask::STARTED) {
@@ -207,7 +211,7 @@
monitor.notify();
}
-
+
while (state == SynchStartTask::STOPPING) {
monitor.wait();
}
@@ -233,7 +237,10 @@
for (size_t ix = 0; ix < count; ix++) {
{
Synchronized s(monitor);
- monitor.wait(timeout);
+ try {
+ monitor.wait(timeout);
+ } catch(TimedOutException& e) {
+ }
}
}
diff --git a/lib/cpp/src/concurrency/test/ThreadManagerTests.h b/lib/cpp/src/concurrency/test/ThreadManagerTests.h
index 9f04435..89e6843 100644
--- a/lib/cpp/src/concurrency/test/ThreadManagerTests.h
+++ b/lib/cpp/src/concurrency/test/ThreadManagerTests.h
@@ -21,7 +21,7 @@
using namespace facebook::thrift::concurrency;
/**
- * ThreadManagerTests class
+ * ThreadManagerTests class
*
* @author marc
* @version $Id:$
@@ -35,8 +35,8 @@
class Task: public Runnable {
public:
-
- Task(Monitor& monitor, size_t& count, long long timeout) :
+
+ Task(Monitor& monitor, size_t& count, long long timeout) :
_monitor(monitor),
_count(count),
_timeout(timeout),
@@ -49,27 +49,33 @@
{
Synchronized s(_sleep);
- _sleep.wait(_timeout);
+ try {
+ _sleep.wait(_timeout);
+ } catch(TimedOutException& e) {
+ ;
+ }catch(...) {
+ assert(0);
+ }
}
_endTime = Util::currentTime();
_done = true;
-
+
{
Synchronized s(_monitor);
// std::cout << "Thread " << _count << " completed " << std::endl;
-
+
_count--;
if (_count == 0) {
-
+
_monitor.notify();
}
}
}
-
+
Monitor& _monitor;
size_t& _count;
long long _timeout;
@@ -95,11 +101,11 @@
shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
threadFactory->priority(PosixThreadFactory::HIGHEST);
-
+
threadManager->threadFactory(threadFactory);
threadManager->start();
-
+
std::set<shared_ptr<ThreadManagerTests::Task> > tasks;
for (size_t ix = 0; ix < count; ix++) {
@@ -118,7 +124,7 @@
Synchronized s(monitor);
while(activeCount > 0) {
-
+
monitor.wait();
}
}
@@ -133,7 +139,7 @@
long long maxTime = 0;
for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
-
+
shared_ptr<ThreadManagerTests::Task> task = *ix;
long long delta = task->_endTime - task->_startTime;
@@ -158,7 +164,7 @@
averageTime+= delta;
}
-
+
averageTime /= count;
std::cout << "\t\t\tfirst start: " << firstTime << "ms Last end: " << lastTime << "ms min: " << minTime << "ms max: " << maxTime << "ms average: " << averageTime << "ms" << std::endl;
@@ -177,6 +183,167 @@
return success;
}
+
+ class BlockTask: public Runnable {
+
+ public:
+
+ BlockTask(Monitor& monitor, Monitor& bmonitor, size_t& count) :
+ _monitor(monitor),
+ _bmonitor(bmonitor),
+ _count(count) {}
+
+ void run() {
+ {
+ Synchronized s(_bmonitor);
+
+ _bmonitor.wait();
+
+ }
+
+ {
+ Synchronized s(_monitor);
+
+ _count--;
+
+ if (_count == 0) {
+
+ _monitor.notify();
+ }
+ }
+ }
+
+ Monitor& _monitor;
+ Monitor& _bmonitor;
+ size_t& _count;
+ };
+
+ /**
+ * Block test. Create pendingTaskCountMax tasks. Verify that we block adding the
+ * pendingTaskCountMax + 1th task. Verify that we unblock when a task completes */
+
+ bool blockTest(long long timeout=100LL, size_t workerCount=2) {
+
+ bool success = false;
+
+ try {
+
+ Monitor bmonitor;
+ Monitor monitor;
+
+ size_t pendingTaskMaxCount = workerCount;
+
+ size_t activeCounts[] = {workerCount, pendingTaskMaxCount, 1};
+
+ shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount, pendingTaskMaxCount);
+
+ shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
+
+ threadFactory->priority(PosixThreadFactory::HIGHEST);
+
+ threadManager->threadFactory(threadFactory);
+
+ threadManager->start();
+
+ std::set<shared_ptr<ThreadManagerTests::BlockTask> > tasks;
+
+ for (size_t ix = 0; ix < workerCount; ix++) {
+
+ tasks.insert(shared_ptr<ThreadManagerTests::BlockTask>(new ThreadManagerTests::BlockTask(monitor, bmonitor,activeCounts[0])));
+ }
+
+ for (size_t ix = 0; ix < pendingTaskMaxCount; ix++) {
+
+ tasks.insert(shared_ptr<ThreadManagerTests::BlockTask>(new ThreadManagerTests::BlockTask(monitor, bmonitor,activeCounts[1])));
+ }
+
+ for (std::set<shared_ptr<ThreadManagerTests::BlockTask> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
+ threadManager->add(*ix);
+ }
+
+ if(!(success = (threadManager->totalTaskCount() == pendingTaskMaxCount + workerCount))) {
+ throw TException("Unexpected pending task count");
+ }
+
+ shared_ptr<ThreadManagerTests::BlockTask> extraTask(new ThreadManagerTests::BlockTask(monitor, bmonitor, activeCounts[2]));
+
+ try {
+ threadManager->add(extraTask, 1);
+ throw TException("Unexpected success adding task in excess of pending task count");
+ } catch(TimedOutException& e) {
+ }
+
+ std::cout << "\t\t\t" << "Pending tasks " << threadManager->pendingTaskCount() << std::endl;
+
+ {
+ Synchronized s(bmonitor);
+
+ bmonitor.notifyAll();
+ }
+
+ {
+ Synchronized s(monitor);
+
+ while(activeCounts[0] != 0) {
+ monitor.wait();
+ }
+ }
+
+ std::cout << "\t\t\t" << "Pending tasks " << threadManager->pendingTaskCount() << std::endl;
+
+ try {
+ threadManager->add(extraTask, 1);
+ } catch(TimedOutException& e) {
+ std::cout << "\t\t\t" << "add timed out unexpectedly" << std::endl;
+ throw TException("Unexpected timeout adding task");
+
+ } catch(TooManyPendingTasksException& e) {
+ std::cout << "\t\t\t" << "add encountered too many pending exepctions" << std::endl;
+ throw TException("Unexpected timeout adding task");
+ }
+
+ // Wake up tasks that were pending before and wait for them to complete
+
+ {
+ Synchronized s(bmonitor);
+
+ bmonitor.notifyAll();
+ }
+
+ {
+ Synchronized s(monitor);
+
+ while(activeCounts[1] != 0) {
+ monitor.wait();
+ }
+ }
+
+ // Wake up the extra task and wait for it to complete
+
+ {
+ Synchronized s(bmonitor);
+
+ bmonitor.notifyAll();
+ }
+
+ {
+ Synchronized s(monitor);
+
+ while(activeCounts[2] != 0) {
+ monitor.wait();
+ }
+ }
+
+ if(!(success = (threadManager->totalTaskCount() == 0))) {
+ throw TException("Unexpected pending task count");
+ }
+
+ } catch(TException& e) {
+ }
+
+ std::cout << "\t\t\t" << (success ? "Success" : "Failure") << std::endl;
+ return success;
+ }
};
const double ThreadManagerTests::ERROR = .20;
diff --git a/lib/cpp/src/server/TThreadPoolServer.cpp b/lib/cpp/src/server/TThreadPoolServer.cpp
index 69ca7d5..a271934 100644
--- a/lib/cpp/src/server/TThreadPoolServer.cpp
+++ b/lib/cpp/src/server/TThreadPoolServer.cpp
@@ -68,7 +68,7 @@
shared_ptr<ThreadManager> threadManager) :
TServer(processor, serverTransport, transportFactory, protocolFactory),
threadManager_(threadManager),
- stop_(false) {}
+ stop_(false), timeout_(0) {}
TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
shared_ptr<TServerTransport> serverTransport,
@@ -80,7 +80,7 @@
TServer(processor, serverTransport, inputTransportFactory, outputTransportFactory,
inputProtocolFactory, outputProtocolFactory),
threadManager_(threadManager),
- stop_(false) {}
+ stop_(false), timeout_(0) {}
TThreadPoolServer::~TThreadPoolServer() {}
@@ -118,7 +118,7 @@
outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
// Add to threadmanager pool
- threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(processor_, inputProtocol, outputProtocol)));
+ threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(processor_, inputProtocol, outputProtocol)), timeout_);
} catch (TTransportException& ttx) {
if (inputTransport != NULL) { inputTransport->close(); }
@@ -156,4 +156,7 @@
}
+long long TThreadPoolServer::timeout() const {return timeout_;}
+void TThreadPoolServer::timeout(long long value) {timeout_ = value;}
+
}}} // facebook::thrift::server
diff --git a/lib/cpp/src/server/TThreadPoolServer.h b/lib/cpp/src/server/TThreadPoolServer.h
index fd74501..b8b64f2 100644
--- a/lib/cpp/src/server/TThreadPoolServer.h
+++ b/lib/cpp/src/server/TThreadPoolServer.h
@@ -41,6 +41,9 @@
virtual ~TThreadPoolServer();
virtual void serve();
+
+ virtual long long timeout() const;
+ virtual void timeout(long long value);
virtual void stop() {
stop_ = true;
@@ -52,6 +55,8 @@
boost::shared_ptr<ThreadManager> threadManager_;
volatile bool stop_;
+
+ volatile long long timeout_;
};