THRIFT-3932 fixed ThreadManager concurrency issues, added more tests in that area, did a little refactoring and prettying up along the way
Client: C++
This closes #1103
diff --git a/lib/cpp/src/thrift/concurrency/BoostThreadFactory.cpp b/lib/cpp/src/thrift/concurrency/BoostThreadFactory.cpp
index 96cb6d6..a72d38b 100644
--- a/lib/cpp/src/thrift/concurrency/BoostThreadFactory.cpp
+++ b/lib/cpp/src/thrift/concurrency/BoostThreadFactory.cpp
@@ -126,54 +126,19 @@
return (void*)0;
}
-/**
- * POSIX Thread factory implementation
- */
-class BoostThreadFactory::Impl {
-
-private:
- bool detached_;
-
-public:
- Impl(bool detached) : detached_(detached) {}
-
- /**
- * Creates a new POSIX thread to run the runnable object
- *
- * @param runnable A runnable object
- */
- shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const {
- shared_ptr<BoostThread> result = shared_ptr<BoostThread>(new BoostThread(detached_, runnable));
- result->weakRef(result);
- runnable->thread(result);
- return result;
- }
-
- bool isDetached() const { return detached_; }
-
- void setDetached(bool value) { detached_ = value; }
-
- Thread::id_t getCurrentThreadId() const { return boost::this_thread::get_id(); }
-};
-
BoostThreadFactory::BoostThreadFactory(bool detached)
- : impl_(new BoostThreadFactory::Impl(detached)) {
+ : ThreadFactory(detached) {
}
shared_ptr<Thread> BoostThreadFactory::newThread(shared_ptr<Runnable> runnable) const {
- return impl_->newThread(runnable);
-}
-
-bool BoostThreadFactory::isDetached() const {
- return impl_->isDetached();
-}
-
-void BoostThreadFactory::setDetached(bool value) {
- impl_->setDetached(value);
+ shared_ptr<BoostThread> result = shared_ptr<BoostThread>(new BoostThread(isDetached(), runnable));
+ result->weakRef(result);
+ runnable->thread(result);
+ return result;
}
Thread::id_t BoostThreadFactory::getCurrentThreadId() const {
- return impl_->getCurrentThreadId();
+ return boost::this_thread::get_id();
}
}
}
diff --git a/lib/cpp/src/thrift/concurrency/BoostThreadFactory.h b/lib/cpp/src/thrift/concurrency/BoostThreadFactory.h
index e6d1a56..7973245 100644
--- a/lib/cpp/src/thrift/concurrency/BoostThreadFactory.h
+++ b/lib/cpp/src/thrift/concurrency/BoostThreadFactory.h
@@ -55,21 +55,8 @@
// From ThreadFactory;
Thread::id_t getCurrentThreadId() const;
-
- /**
- * Sets detached mode of threads
- */
- virtual void setDetached(bool detached);
-
- /**
- * Gets current detached mode
- */
- virtual bool isDetached() const;
-
-private:
- class Impl;
- boost::shared_ptr<Impl> impl_;
};
+
}
}
} // apache::thrift::concurrency
diff --git a/lib/cpp/src/thrift/concurrency/PosixThreadFactory.cpp b/lib/cpp/src/thrift/concurrency/PosixThreadFactory.cpp
index 05a3c02..6a0b47c 100644
--- a/lib/cpp/src/thrift/concurrency/PosixThreadFactory.cpp
+++ b/lib/cpp/src/thrift/concurrency/PosixThreadFactory.cpp
@@ -214,149 +214,104 @@
}
/**
- * POSIX Thread factory implementation
+ * Converts generic posix thread schedule policy enums into pthread
+ * API values.
*/
-class PosixThreadFactory::Impl {
-
-private:
- POLICY policy_;
- PRIORITY priority_;
- int stackSize_;
- bool detached_;
-
- /**
- * Converts generic posix thread schedule policy enums into pthread
- * API values.
- */
- static int toPthreadPolicy(POLICY policy) {
- switch (policy) {
- case OTHER:
- return SCHED_OTHER;
- case FIFO:
- return SCHED_FIFO;
- case ROUND_ROBIN:
- return SCHED_RR;
- }
+static int toPthreadPolicy(PosixThreadFactory::POLICY policy) {
+ switch (policy) {
+ case PosixThreadFactory::OTHER:
return SCHED_OTHER;
+ case PosixThreadFactory::FIFO:
+ return SCHED_FIFO;
+ case PosixThreadFactory::ROUND_ROBIN:
+ return SCHED_RR;
}
+ return SCHED_OTHER;
+}
- /**
- * Converts relative thread priorities to absolute value based on posix
- * thread scheduler policy
- *
- * The idea is simply to divide up the priority range for the given policy
- * into the correpsonding relative priority level (lowest..highest) and
- * then pro-rate accordingly.
- */
- static int toPthreadPriority(POLICY policy, PRIORITY priority) {
- int pthread_policy = toPthreadPolicy(policy);
- int min_priority = 0;
- int max_priority = 0;
+/**
+ * Converts relative thread priorities to absolute value based on posix
+ * thread scheduler policy
+ *
+ * The idea is simply to divide up the priority range for the given policy
+ * into the correpsonding relative priority level (lowest..highest) and
+ * then pro-rate accordingly.
+ */
+static int toPthreadPriority(PosixThreadFactory::POLICY policy, PosixThreadFactory::PRIORITY priority) {
+ int pthread_policy = toPthreadPolicy(policy);
+ int min_priority = 0;
+ int max_priority = 0;
#ifdef HAVE_SCHED_GET_PRIORITY_MIN
- min_priority = sched_get_priority_min(pthread_policy);
+ min_priority = sched_get_priority_min(pthread_policy);
#endif
#ifdef HAVE_SCHED_GET_PRIORITY_MAX
- max_priority = sched_get_priority_max(pthread_policy);
+ max_priority = sched_get_priority_max(pthread_policy);
#endif
- int quanta = (HIGHEST - LOWEST) + 1;
- float stepsperquanta = (float)(max_priority - min_priority) / quanta;
+ int quanta = (PosixThreadFactory::HIGHEST - PosixThreadFactory::LOWEST) + 1;
+ float stepsperquanta = (float)(max_priority - min_priority) / quanta;
- if (priority <= HIGHEST) {
- return (int)(min_priority + stepsperquanta * priority);
- } else {
- // should never get here for priority increments.
- assert(false);
- return (int)(min_priority + stepsperquanta * NORMAL);
- }
+ if (priority <= PosixThreadFactory::HIGHEST) {
+ return (int)(min_priority + stepsperquanta * priority);
+ } else {
+ // should never get here for priority increments.
+ assert(false);
+ return (int)(min_priority + stepsperquanta * PosixThreadFactory::NORMAL);
}
-
-public:
- Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached)
- : policy_(policy), priority_(priority), stackSize_(stackSize), detached_(detached) {}
-
- /**
- * Creates a new POSIX thread to run the runnable object
- *
- * @param runnable A runnable object
- */
- shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const {
- shared_ptr<PthreadThread> result
- = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(policy_),
- toPthreadPriority(policy_, priority_),
- stackSize_,
- detached_,
- runnable));
- result->weakRef(result);
- runnable->thread(result);
- return result;
- }
-
- int getStackSize() const { return stackSize_; }
-
- void setStackSize(int value) { stackSize_ = value; }
-
- PRIORITY getPriority() const { return priority_; }
-
- /**
- * Sets priority.
- *
- * XXX
- * Need to handle incremental priorities properly.
- */
- void setPriority(PRIORITY value) { priority_ = value; }
-
- bool isDetached() const { return detached_; }
-
- void setDetached(bool value) { detached_ = value; }
-
- Thread::id_t getCurrentThreadId() const {
-
-#ifndef _WIN32
- return (Thread::id_t)pthread_self();
-#else
- return (Thread::id_t)pthread_self().p;
-#endif // _WIN32
- }
-};
+}
PosixThreadFactory::PosixThreadFactory(POLICY policy,
PRIORITY priority,
int stackSize,
bool detached)
- : impl_(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {
+ : ThreadFactory(detached),
+ policy_(policy),
+ priority_(priority),
+ stackSize_(stackSize) {
+}
+
+PosixThreadFactory::PosixThreadFactory(bool detached)
+ : ThreadFactory(detached),
+ policy_(ROUND_ROBIN),
+ priority_(NORMAL),
+ stackSize_(1) {
}
shared_ptr<Thread> PosixThreadFactory::newThread(shared_ptr<Runnable> runnable) const {
- return impl_->newThread(runnable);
+ shared_ptr<PthreadThread> result
+ = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(policy_),
+ toPthreadPriority(policy_, priority_),
+ stackSize_,
+ isDetached(),
+ runnable));
+ result->weakRef(result);
+ runnable->thread(result);
+ return result;
}
int PosixThreadFactory::getStackSize() const {
- return impl_->getStackSize();
+ return stackSize_;
}
void PosixThreadFactory::setStackSize(int value) {
- impl_->setStackSize(value);
+ stackSize_ = value;
}
PosixThreadFactory::PRIORITY PosixThreadFactory::getPriority() const {
- return impl_->getPriority();
+ return priority_;
}
-void PosixThreadFactory::setPriority(PosixThreadFactory::PRIORITY value) {
- impl_->setPriority(value);
-}
-
-bool PosixThreadFactory::isDetached() const {
- return impl_->isDetached();
-}
-
-void PosixThreadFactory::setDetached(bool value) {
- impl_->setDetached(value);
+void PosixThreadFactory::setPriority(PRIORITY value) {
+ priority_ = value;
}
Thread::id_t PosixThreadFactory::getCurrentThreadId() const {
- return impl_->getCurrentThreadId();
+#ifndef _WIN32
+ return (Thread::id_t)pthread_self();
+#else
+ return (Thread::id_t)pthread_self().p;
+#endif // _WIN32
}
+
}
}
} // apache::thrift::concurrency
diff --git a/lib/cpp/src/thrift/concurrency/PosixThreadFactory.h b/lib/cpp/src/thrift/concurrency/PosixThreadFactory.h
index b26d296..c1bbe5c 100644
--- a/lib/cpp/src/thrift/concurrency/PosixThreadFactory.h
+++ b/lib/cpp/src/thrift/concurrency/PosixThreadFactory.h
@@ -74,12 +74,19 @@
*
* By default threads are not joinable.
*/
-
PosixThreadFactory(POLICY policy = ROUND_ROBIN,
PRIORITY priority = NORMAL,
int stackSize = 1,
bool detached = true);
+ /**
+ * Provide a constructor compatible with the other factories
+ * The default policy is POLICY::ROUND_ROBIN.
+ * The default priority is PRIORITY::NORMAL.
+ * The default stackSize is 1.
+ */
+ PosixThreadFactory(bool detached);
+
// From ThreadFactory;
boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const;
@@ -87,14 +94,14 @@
Thread::id_t getCurrentThreadId() const;
/**
- * Gets stack size for created threads
+ * Gets stack size for newly created threads
*
* @return int size in megabytes
*/
virtual int getStackSize() const;
/**
- * Sets stack size for created threads
+ * Sets stack size for newly created threads
*
* @param value size in megabytes
*/
@@ -110,19 +117,10 @@
*/
virtual void setPriority(PRIORITY priority);
- /**
- * Sets detached mode of threads
- */
- virtual void setDetached(bool detached);
-
- /**
- * Gets current detached mode
- */
- virtual bool isDetached() const;
-
private:
- class Impl;
- boost::shared_ptr<Impl> impl_;
+ POLICY policy_;
+ PRIORITY priority_;
+ int stackSize_;
};
}
}
diff --git a/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp b/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp
index d57e7ec..66c7e75 100644
--- a/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp
+++ b/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp
@@ -116,53 +116,17 @@
return;
}
-/**
- * std::thread factory implementation
- */
-class StdThreadFactory::Impl {
-
-private:
- bool detached_;
-
-public:
- Impl(bool detached) : detached_(detached) {}
-
- /**
- * Creates a new std::thread to run the runnable object
- *
- * @param runnable A runnable object
- */
- boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const {
- boost::shared_ptr<StdThread> result
- = boost::shared_ptr<StdThread>(new StdThread(detached_, runnable));
- runnable->thread(result);
- return result;
- }
-
- bool isDetached() const { return detached_; }
-
- void setDetached(bool value) { detached_ = value; }
-
- Thread::id_t getCurrentThreadId() const { return std::this_thread::get_id(); }
-};
-
-StdThreadFactory::StdThreadFactory(bool detached) : impl_(new StdThreadFactory::Impl(detached)) {
+StdThreadFactory::StdThreadFactory(bool detached) : ThreadFactory(detached) {
}
boost::shared_ptr<Thread> StdThreadFactory::newThread(boost::shared_ptr<Runnable> runnable) const {
- return impl_->newThread(runnable);
-}
-
-bool StdThreadFactory::isDetached() const {
- return impl_->isDetached();
-}
-
-void StdThreadFactory::setDetached(bool value) {
- impl_->setDetached(value);
+ boost::shared_ptr<StdThread> result = boost::shared_ptr<StdThread>(new StdThread(isDetached(), runnable));
+ runnable->thread(result);
+ return result;
}
Thread::id_t StdThreadFactory::getCurrentThreadId() const {
- return impl_->getCurrentThreadId();
+ return std::this_thread::get_id();
}
}
}
diff --git a/lib/cpp/src/thrift/concurrency/StdThreadFactory.h b/lib/cpp/src/thrift/concurrency/StdThreadFactory.h
index fb86bbf..88f00be 100644
--- a/lib/cpp/src/thrift/concurrency/StdThreadFactory.h
+++ b/lib/cpp/src/thrift/concurrency/StdThreadFactory.h
@@ -52,21 +52,8 @@
// From ThreadFactory;
Thread::id_t getCurrentThreadId() const;
-
- /**
- * Sets detached mode of threads
- */
- virtual void setDetached(bool detached);
-
- /**
- * Gets current detached mode
- */
- virtual bool isDetached() const;
-
-private:
- class Impl;
- boost::shared_ptr<Impl> impl_;
};
+
}
}
} // apache::thrift::concurrency
diff --git a/lib/cpp/src/thrift/concurrency/Thread.h b/lib/cpp/src/thrift/concurrency/Thread.h
index f7c7bd6..2e15489 100644
--- a/lib/cpp/src/thrift/concurrency/Thread.h
+++ b/lib/cpp/src/thrift/concurrency/Thread.h
@@ -108,8 +108,9 @@
virtual void start() = 0;
/**
- * Join this thread. Current thread blocks until this target thread
- * completes.
+ * Join this thread. If this thread is joinable, the calling thread blocks
+ * until this thread completes. If the target thread is not joinable, then
+ * nothing happens.
*/
virtual void join() = 0;
@@ -135,13 +136,21 @@
* object for execution
*/
class ThreadFactory {
+protected:
+ ThreadFactory(bool detached) : detached_(detached) { }
+
public:
- virtual ~ThreadFactory() {}
+ virtual ~ThreadFactory() { }
/**
* Gets current detached mode
*/
- virtual bool isDetached() const = 0;
+ bool isDetached() const { return detached_; }
+
+ /**
+ * Sets the detached disposition of newly created threads.
+ */
+ void setDetached(bool detached) { detached_ = detached; }
/**
* Create a new thread.
@@ -149,16 +158,17 @@
virtual boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const = 0;
/**
- * Sets detached mode of threads
- */
- virtual void setDetached(bool detached) = 0;
-
- static const Thread::id_t unknown_thread_id;
-
- /**
* Gets the current thread id or unknown_thread_id if the current thread is not a thrift thread
*/
virtual Thread::id_t getCurrentThreadId() const = 0;
+
+ /**
+ * For code readability define the unknown/undefined thread id
+ */
+ static const Thread::id_t unknown_thread_id;
+
+private:
+ bool detached_;
};
}
diff --git a/lib/cpp/src/thrift/concurrency/ThreadManager.cpp b/lib/cpp/src/thrift/concurrency/ThreadManager.cpp
index 24bfeec..c4726dd 100644
--- a/lib/cpp/src/thrift/concurrency/ThreadManager.cpp
+++ b/lib/cpp/src/thrift/concurrency/ThreadManager.cpp
@@ -26,8 +26,8 @@
#include <boost/shared_ptr.hpp>
-#include <assert.h>
-#include <queue>
+#include <stdexcept>
+#include <deque>
#include <set>
#if defined(DEBUG)
@@ -49,6 +49,9 @@
* it maintains statistics on number of idle threads, number of active threads,
* task backlog, and average wait and service times.
*
+ * There are three different monitors used for signaling different conditions
+ * however they all share the same mutex_.
+ *
* @version $Id:$
*/
class ThreadManager::Impl : public ThreadManager {
@@ -62,25 +65,26 @@
expiredCount_(0),
state_(ThreadManager::UNINITIALIZED),
monitor_(&mutex_),
- maxMonitor_(&mutex_) {}
+ maxMonitor_(&mutex_),
+ workerMonitor_(&mutex_) {}
~Impl() { stop(); }
void start();
-
- void stop() { stopImpl(false); }
-
- void join() { stopImpl(true); }
+ void stop();
ThreadManager::STATE state() const { return state_; }
shared_ptr<ThreadFactory> threadFactory() const {
- Synchronized s(monitor_);
+ Guard g(mutex_);
return threadFactory_;
}
void threadFactory(shared_ptr<ThreadFactory> value) {
- Synchronized s(monitor_);
+ Guard g(mutex_);
+ if (threadFactory_ && threadFactory_->isDetached() != value->isDetached()) {
+ throw InvalidArgumentException();
+ }
threadFactory_ = value;
}
@@ -91,51 +95,65 @@
size_t idleWorkerCount() const { return idleCount_; }
size_t workerCount() const {
- Synchronized s(monitor_);
+ Guard g(mutex_);
return workerCount_;
}
size_t pendingTaskCount() const {
- Synchronized s(monitor_);
+ Guard g(mutex_);
return tasks_.size();
}
size_t totalTaskCount() const {
- Synchronized s(monitor_);
+ Guard g(mutex_);
return tasks_.size() + workerCount_ - idleCount_;
}
size_t pendingTaskCountMax() const {
- Synchronized s(monitor_);
+ Guard g(mutex_);
return pendingTaskCountMax_;
}
size_t expiredTaskCount() {
- Synchronized s(monitor_);
- size_t result = expiredCount_;
- expiredCount_ = 0;
- return result;
+ Guard g(mutex_);
+ return expiredCount_;
}
void pendingTaskCountMax(const size_t value) {
- Synchronized s(monitor_);
+ Guard g(mutex_);
pendingTaskCountMax_ = value;
}
- bool canSleep();
-
void add(shared_ptr<Runnable> value, int64_t timeout, int64_t expiration);
void remove(shared_ptr<Runnable> task);
shared_ptr<Runnable> removeNextPending();
- void removeExpiredTasks();
+ void removeExpiredTasks() {
+ removeExpired(false);
+ }
void setExpireCallback(ExpireCallback expireCallback);
private:
- void stopImpl(bool join);
+ /**
+ * Remove one or more expired tasks.
+ * \param[in] justOne if true, try to remove just one task and return
+ */
+ void removeExpired(bool justOne);
+
+ /**
+ * \returns whether it is acceptable to block, depending on the current thread id
+ */
+ bool canSleep() const;
+
+ /**
+ * Lowers the maximum worker count and blocks until enough worker threads complete
+ * to get to the new maximum worker limit. The caller is responsible for acquiring
+ * a lock on the class mutex_.
+ */
+ void removeWorkersUnderLock(size_t value);
size_t workerCount_;
size_t workerMaxCount_;
@@ -148,11 +166,12 @@
shared_ptr<ThreadFactory> threadFactory_;
friend class ThreadManager::Task;
- std::queue<shared_ptr<Task> > tasks_;
+ typedef std::deque<shared_ptr<Task> > TaskQueue;
+ TaskQueue tasks_;
Mutex mutex_;
Monitor monitor_;
Monitor maxMonitor_;
- Monitor workerMonitor_;
+ Monitor workerMonitor_; // used to synchronize changes in worker count
friend class ThreadManager::Worker;
std::set<shared_ptr<Thread> > workers_;
@@ -163,7 +182,7 @@
class ThreadManager::Task : public Runnable {
public:
- enum STATE { WAITING, EXECUTING, CANCELLED, COMPLETE };
+ enum STATE { WAITING, EXECUTING, TIMEDOUT, COMPLETE };
Task(shared_ptr<Runnable> runnable, int64_t expiration = 0LL)
: runnable_(runnable),
@@ -194,7 +213,7 @@
enum STATE { UNINITIALIZED, STARTING, STARTED, STOPPING, STOPPED };
public:
- Worker(ThreadManager::Impl* manager) : manager_(manager), state_(UNINITIALIZED), idle_(false) {}
+ Worker(ThreadManager::Impl* manager) : manager_(manager), state_(UNINITIALIZED) {}
~Worker() {}
@@ -212,78 +231,82 @@
* execute.
*/
void run() {
- bool active = false;
+ Guard g(manager_->mutex_);
+
+ /**
+ * This method has three parts; one is to check for and account for
+ * admitting a task which happens under a lock. Then the lock is released
+ * and the task itself is executed. Finally we do some accounting
+ * under lock again when the task completes.
+ */
+
+ /**
+ * Admitting
+ */
+
/**
* Increment worker semaphore and notify manager if worker count reached
* desired max
- *
- * Note: We have to release the monitor and acquire the workerMonitor
- * since that is what the manager blocks on for worker add/remove
*/
- {
- bool notifyManager = false;
- {
- Synchronized s(manager_->monitor_);
- active = manager_->workerCount_ < manager_->workerMaxCount_;
- if (active) {
- manager_->workerCount_++;
- notifyManager = manager_->workerCount_ == manager_->workerMaxCount_;
- }
- }
-
- if (notifyManager) {
- Synchronized s(manager_->workerMonitor_);
+ bool active = manager_->workerCount_ < manager_->workerMaxCount_;
+ if (active) {
+ if (++manager_->workerCount_ == manager_->workerMaxCount_) {
manager_->workerMonitor_.notify();
}
}
while (active) {
+ /**
+ * While holding manager monitor block for non-empty task queue (Also
+ * check that the thread hasn't been requested to stop). Once the queue
+ * is non-empty, dequeue a task, release monitor, and execute. If the
+ * worker max count has been decremented such that we exceed it, mark
+ * ourself inactive, decrement the worker count and notify the manager
+ * (technically we're notifying the next blocked thread but eventually
+ * the manager will see it.
+ */
+ active = isActive();
+
+ while (active && manager_->tasks_.empty()) {
+ manager_->idleCount_++;
+ manager_->monitor_.wait();
+ active = isActive();
+ manager_->idleCount_--;
+ }
+
shared_ptr<ThreadManager::Task> task;
- /**
- * While holding manager monitor block for non-empty task queue (Also
- * check that the thread hasn't been requested to stop). Once the queue
- * is non-empty, dequeue a task, release monitor, and execute. If the
- * worker max count has been decremented such that we exceed it, mark
- * ourself inactive, decrement the worker count and notify the manager
- * (technically we're notifying the next blocked thread but eventually
- * the manager will see it.
- */
- {
- Guard g(manager_->mutex_);
- active = isActive();
-
- while (active && manager_->tasks_.empty()) {
- manager_->idleCount_++;
- idle_ = true;
- manager_->monitor_.wait();
- active = isActive();
- idle_ = false;
- manager_->idleCount_--;
+ if (active) {
+ if (!manager_->tasks_.empty()) {
+ task = manager_->tasks_.front();
+ manager_->tasks_.pop_front();
+ if (task->state_ == ThreadManager::Task::WAITING) {
+ // If the state is changed to anything other than EXECUTING or TIMEDOUT here
+ // then the execution loop needs to be changed below.
+ task->state_ =
+ (task->getExpireTime() && task->getExpireTime() < Util::currentTime()) ?
+ ThreadManager::Task::TIMEDOUT :
+ ThreadManager::Task::EXECUTING;
+ }
}
- if (active) {
- manager_->removeExpiredTasks();
-
- if (!manager_->tasks_.empty()) {
- task = manager_->tasks_.front();
- manager_->tasks_.pop();
- if (task->state_ == ThreadManager::Task::WAITING) {
- task->state_ = ThreadManager::Task::EXECUTING;
- }
- }
-
- /* If we have a pending task max and we just dropped below it, wakeup any
- thread that might be blocked on add. */
- if (manager_->pendingTaskCountMax_ != 0
- && manager_->tasks_.size() <= manager_->pendingTaskCountMax_ - 1) {
- manager_->maxMonitor_.notify();
- }
+ /* If we have a pending task max and we just dropped below it, wakeup any
+ thread that might be blocked on add. */
+ if (manager_->pendingTaskCountMax_ != 0
+ && manager_->tasks_.size() <= manager_->pendingTaskCountMax_ - 1) {
+ manager_->maxMonitor_.notify();
}
}
+ /**
+ * Execution - not holding a lock
+ */
if (task) {
if (task->state_ == ThreadManager::Task::EXECUTING) {
+
+ // Release the lock so we can run the task without blocking the thread manager
+ manager_->mutex_.unlock();
+
try {
task->run();
} catch (const std::exception& e) {
@@ -291,29 +314,31 @@
} catch (...) {
GlobalOutput.printf("[ERROR] task->run() raised an unknown exception");
}
+
+ // Re-acquire the lock to proceed in the thread manager
+ manager_->mutex_.lock();
+
+ } else if (manager_->expireCallback_) {
+ // The only other state the task could have been in is TIMEDOUT (see above)
+ manager_->expireCallback_(task->getRunnable());
+ manager_->expiredCount_++;
}
}
}
- {
- Synchronized s(manager_->workerMonitor_);
- manager_->deadWorkers_.insert(this->thread());
- idle_ = true;
- manager_->workerCount_--;
- bool notifyManager = (manager_->workerCount_ == manager_->workerMaxCount_);
- if (notifyManager) {
- manager_->workerMonitor_.notify();
- }
+ /**
+ * Final accounting for the worker thread that is done working
+ */
+ manager_->deadWorkers_.insert(this->thread());
+ if (--manager_->workerCount_ == manager_->workerMaxCount_) {
+ manager_->workerMonitor_.notify();
}
-
- return;
}
private:
ThreadManager::Impl* manager_;
friend class ThreadManager::Impl;
STATE state_;
- bool idle_;
};
void ThreadManager::Impl::addWorker(size_t value) {
@@ -324,11 +349,9 @@
newThreads.insert(threadFactory_->newThread(worker));
}
- {
- Synchronized s(monitor_);
- workerMaxCount_ += value;
- workers_.insert(newThreads.begin(), newThreads.end());
- }
+ Guard g(mutex_);
+ workerMaxCount_ += value;
+ workers_.insert(newThreads.begin(), newThreads.end());
for (std::set<shared_ptr<Thread> >::iterator ix = newThreads.begin(); ix != newThreads.end();
++ix) {
@@ -339,103 +362,92 @@
idMap_.insert(std::pair<const Thread::id_t, shared_ptr<Thread> >((*ix)->getId(), *ix));
}
- {
- Synchronized s(workerMonitor_);
- while (workerCount_ != workerMaxCount_) {
- workerMonitor_.wait();
- }
+ while (workerCount_ != workerMaxCount_) {
+ workerMonitor_.wait();
}
}
void ThreadManager::Impl::start() {
-
+ Guard g(mutex_);
if (state_ == ThreadManager::STOPPED) {
return;
}
- {
- Synchronized s(monitor_);
- if (state_ == ThreadManager::UNINITIALIZED) {
- if (!threadFactory_) {
- throw InvalidArgumentException();
- }
- state_ = ThreadManager::STARTED;
- monitor_.notifyAll();
+ if (state_ == ThreadManager::UNINITIALIZED) {
+ if (!threadFactory_) {
+ throw InvalidArgumentException();
}
+ state_ = ThreadManager::STARTED;
+ monitor_.notifyAll();
+ }
- while (state_ == STARTING) {
- monitor_.wait();
- }
+ while (state_ == STARTING) {
+ monitor_.wait();
}
}
-void ThreadManager::Impl::stopImpl(bool join) {
+void ThreadManager::Impl::stop() {
+ Guard g(mutex_);
bool doStop = false;
- if (state_ == ThreadManager::STOPPED) {
- return;
- }
- {
- Synchronized s(monitor_);
- if (state_ != ThreadManager::STOPPING && state_ != ThreadManager::JOINING
- && state_ != ThreadManager::STOPPED) {
- doStop = true;
- state_ = join ? ThreadManager::JOINING : ThreadManager::STOPPING;
- }
+ if (state_ != ThreadManager::STOPPING && state_ != ThreadManager::JOINING
+ && state_ != ThreadManager::STOPPED) {
+ doStop = true;
+ state_ = ThreadManager::JOINING;
}
if (doStop) {
- removeWorker(workerCount_);
+ removeWorkersUnderLock(workerCount_);
}
- // XXX
- // should be able to block here for transition to STOPPED since we're no
- // using shared_ptrs
-
- {
- Synchronized s(monitor_);
- state_ = ThreadManager::STOPPED;
- }
+ state_ = ThreadManager::STOPPED;
}
void ThreadManager::Impl::removeWorker(size_t value) {
- std::set<shared_ptr<Thread> > removedThreads;
- {
- Synchronized s(monitor_);
- if (value > workerMaxCount_) {
- throw InvalidArgumentException();
- }
-
- workerMaxCount_ -= value;
-
- if (idleCount_ < value) {
- for (size_t ix = 0; ix < idleCount_; ix++) {
- monitor_.notify();
- }
- } else {
- monitor_.notifyAll();
- }
- }
-
- {
- Synchronized s(workerMonitor_);
-
- while (workerCount_ != workerMaxCount_) {
- workerMonitor_.wait();
- }
-
- for (std::set<shared_ptr<Thread> >::iterator ix = deadWorkers_.begin();
- ix != deadWorkers_.end();
- ++ix) {
- idMap_.erase((*ix)->getId());
- workers_.erase(*ix);
- }
-
- deadWorkers_.clear();
- }
+ Guard g(mutex_);
+ removeWorkersUnderLock(value);
}
-bool ThreadManager::Impl::canSleep() {
+void ThreadManager::Impl::removeWorkersUnderLock(size_t value) {
+ if (value > workerMaxCount_) {
+ throw InvalidArgumentException();
+ }
+
+ workerMaxCount_ -= value;
+
+ if (idleCount_ > value) {
+ // There are more idle workers than we need to remove,
+ // so notify enough of them so they can terminate.
+ for (size_t ix = 0; ix < value; ix++) {
+ monitor_.notify();
+ }
+ } else {
+ // There are as many or less idle workers than we need to remove,
+ // so just notify them all so they can terminate.
+ monitor_.notifyAll();
+ }
+
+ while (workerCount_ != workerMaxCount_) {
+ workerMonitor_.wait();
+ }
+
+ for (std::set<shared_ptr<Thread> >::iterator ix = deadWorkers_.begin();
+ ix != deadWorkers_.end();
+ ++ix) {
+
+ // when used with a joinable thread factory, we join the threads as we remove them
+ if (!threadFactory_->isDetached()) {
+ (*ix)->join();
+ }
+
+ idMap_.erase((*ix)->getId());
+ workers_.erase(*ix);
+ }
+
+ deadWorkers_.clear();
+}
+
+bool ThreadManager::Impl::canSleep() const {
const Thread::id_t id = threadFactory_->getCurrentThreadId();
return idMap_.find(id) == idMap_.end();
}
@@ -453,7 +465,11 @@
"not started");
}
- removeExpiredTasks();
+ // if we're at a limit, remove an expired task to see if the limit clears
+ if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
+ removeExpired(true);
+ }
+
if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
if (canSleep() && timeout >= 0) {
while (pendingTaskCountMax_ > 0 && tasks_.size() >= pendingTaskCountMax_) {
@@ -465,7 +481,7 @@
}
}
- tasks_.push(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value, expiration)));
+ tasks_.push_back(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value, expiration)));
// If idle thread is available notify it, otherwise all worker threads are
// running and will get around to this task in time.
@@ -475,13 +491,21 @@
}
void ThreadManager::Impl::remove(shared_ptr<Runnable> task) {
- (void)task;
- Synchronized s(monitor_);
+ Guard g(mutex_);
if (state_ != ThreadManager::STARTED) {
throw IllegalStateException(
"ThreadManager::Impl::remove ThreadManager not "
"started");
}
+
+ for (TaskQueue::iterator it = tasks_.begin(); it != tasks_.end(); ++it)
+ {
+ if ((*it)->getRunnable() == task)
+ {
+ tasks_.erase(it);
+ return;
+ }
+ }
}
boost::shared_ptr<Runnable> ThreadManager::Impl::removeNextPending() {
@@ -497,35 +521,40 @@
}
shared_ptr<ThreadManager::Task> task = tasks_.front();
- tasks_.pop();
+ tasks_.pop_front();
return task->getRunnable();
}
-void ThreadManager::Impl::removeExpiredTasks() {
- int64_t now = 0LL; // we won't ask for the time untile we need it
+void ThreadManager::Impl::removeExpired(bool justOne) {
+ // this is always called under a lock
+ int64_t now = 0LL;
- // note that this loop breaks at the first non-expiring task
- while (!tasks_.empty()) {
- shared_ptr<ThreadManager::Task> task = tasks_.front();
- if (task->getExpireTime() == 0LL) {
- break;
- }
+ for (TaskQueue::iterator it = tasks_.begin(); it != tasks_.end(); )
+ {
if (now == 0LL) {
now = Util::currentTime();
}
- if (task->getExpireTime() > now) {
- break;
+
+ if ((*it)->getExpireTime() > 0LL && (*it)->getExpireTime() < now) {
+ if (expireCallback_) {
+ expireCallback_((*it)->getRunnable());
+ }
+ it = tasks_.erase(it);
+ ++expiredCount_;
+ if (justOne) {
+ return;
+ }
}
- if (expireCallback_) {
- expireCallback_(task->getRunnable());
+ else
+ {
+ ++it;
}
- tasks_.pop();
- expiredCount_++;
}
}
void ThreadManager::Impl::setExpireCallback(ExpireCallback expireCallback) {
+ Guard g(mutex_);
expireCallback_ = expireCallback;
}
@@ -544,7 +573,6 @@
private:
const size_t workerCount_;
const size_t pendingTaskCountMax_;
- Monitor monitor_;
};
shared_ptr<ThreadManager> ThreadManager::newThreadManager() {
diff --git a/lib/cpp/src/thrift/concurrency/ThreadManager.h b/lib/cpp/src/thrift/concurrency/ThreadManager.h
index 2112845..d8bf71b 100644
--- a/lib/cpp/src/thrift/concurrency/ThreadManager.h
+++ b/lib/cpp/src/thrift/concurrency/ThreadManager.h
@@ -71,30 +71,45 @@
/**
* Stops the thread manager. Aborts all remaining unprocessed task, shuts
- * down all created worker threads, and realeases all allocated resources.
+ * down all created worker threads, and releases all allocated resources.
* This method blocks for all worker threads to complete, thus it can
* potentially block forever if a worker thread is running a task that
* won't terminate.
+ *
+ * Worker threads will be joined depending on the threadFactory's detached
+ * disposition.
*/
virtual void stop() = 0;
- /**
- * Joins the thread manager. This is the same as stop, except that it will
- * block until all the workers have finished their work. At that point
- * the ThreadManager will transition into the STOPPED state.
- */
- virtual void join() = 0;
-
enum STATE { UNINITIALIZED, STARTING, STARTED, JOINING, STOPPING, STOPPED };
virtual STATE state() const = 0;
+ /**
+ * \returns the current thread factory
+ */
virtual boost::shared_ptr<ThreadFactory> threadFactory() const = 0;
+ /**
+ * Set the thread factory.
+ * \throws InvalidArgumentException if the new thread factory has a different
+ * detached disposition than the one replacing it
+ */
virtual void threadFactory(boost::shared_ptr<ThreadFactory> value) = 0;
+ /**
+ * Adds worker thread(s).
+ */
virtual void addWorker(size_t value = 1) = 0;
+ /**
+ * Removes worker thread(s).
+ * Threads are joined if the thread factory detached disposition allows it.
+ * Blocks until the number of worker threads reaches the new limit.
+ * \param[in] value the number to remove
+ * \throws InvalidArgumentException if the value is greater than the number
+ * of workers
+ */
virtual void removeWorker(size_t value = 1) = 0;
/**
@@ -123,7 +138,8 @@
virtual size_t pendingTaskCountMax() const = 0;
/**
- * Gets the number of tasks which have been expired without being run.
+ * Gets the number of tasks which have been expired without being run
+ * since start() was called.
*/
virtual size_t expiredTaskCount() = 0;
diff --git a/lib/cpp/src/thrift/concurrency/Util.h b/lib/cpp/src/thrift/concurrency/Util.h
index ba070b6..1a91599 100644
--- a/lib/cpp/src/thrift/concurrency/Util.h
+++ b/lib/cpp/src/thrift/concurrency/Util.h
@@ -98,7 +98,7 @@
* Converts struct timeval to arbitrary-sized ticks since epoch
*/
static void toTicks(int64_t& result, const struct timeval& value, int64_t ticksPerSec) {
- return toTicks(result, value.tv_sec, value.tv_usec, US_PER_S, ticksPerSec);
+ return toTicks(result, (unsigned long)value.tv_sec, (unsigned long)value.tv_usec, US_PER_S, ticksPerSec);
}
/**
diff --git a/lib/cpp/src/thrift/server/TThreadPoolServer.cpp b/lib/cpp/src/thrift/server/TThreadPoolServer.cpp
index b81a522..63af85c 100644
--- a/lib/cpp/src/thrift/server/TThreadPoolServer.cpp
+++ b/lib/cpp/src/thrift/server/TThreadPoolServer.cpp
@@ -96,7 +96,7 @@
void TThreadPoolServer::serve() {
TServerFramework::serve();
- threadManager_->join();
+ threadManager_->stop();
}
int64_t TThreadPoolServer::getTimeout() const {
diff --git a/lib/cpp/src/thrift/server/TThreadedServer.cpp b/lib/cpp/src/thrift/server/TThreadedServer.cpp
index e15f8f1..9de1db8 100644
--- a/lib/cpp/src/thrift/server/TThreadedServer.cpp
+++ b/lib/cpp/src/thrift/server/TThreadedServer.cpp
@@ -95,7 +95,6 @@
}
void TThreadedServer::serve() {
- threadFactory_->setDetached(false);
TServerFramework::serve();
// Ensure post-condition of no active clients
@@ -126,7 +125,7 @@
void TThreadedServer::onClientDisconnected(TConnectedClient* pClient) {
Synchronized sync(clientMonitor_);
- drainDeadClients(); // use the outgoing thread to do some maintenance on our dead client backlog
+ drainDeadClients(); // use the outgoing thread to do some maintenance on our dead client backlog
ClientMap::iterator it = activeClientMap_.find(pClient);
ClientMap::iterator end = it;
deadClientMap_.insert(it, ++end);
@@ -153,7 +152,7 @@
}
void TThreadedServer::TConnectedClientRunner::setThread(
- const boost::shared_ptr<apache::thrift::concurrency::Thread>& pThread) {
+ const boost::shared_ptr<apache::thrift::concurrency::Thread>& pThread) {
pThread_ = pThread;
}
diff --git a/lib/cpp/src/thrift/server/TThreadedServer.h b/lib/cpp/src/thrift/server/TThreadedServer.h
index 0f2cce9..758d1d9 100644
--- a/lib/cpp/src/thrift/server/TThreadedServer.h
+++ b/lib/cpp/src/thrift/server/TThreadedServer.h
@@ -34,8 +34,6 @@
* Manage clients using threads - threads are created one for each client and are
* released when the client disconnects. This server is used to make a dynamically
* scalable server up to the concurrent connection limit.
- *
- * The thread factory will be changed to a non-detached type.
*/
class TThreadedServer : public TServerFramework {
public:
@@ -46,7 +44,7 @@
const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& protocolFactory,
const boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>& threadFactory
= boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>(
- new apache::thrift::concurrency::PlatformThreadFactory));
+ new apache::thrift::concurrency::PlatformThreadFactory(false)));
TThreadedServer(
const boost::shared_ptr<apache::thrift::TProcessor>& processor,
@@ -55,7 +53,7 @@
const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& protocolFactory,
const boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>& threadFactory
= boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>(
- new apache::thrift::concurrency::PlatformThreadFactory));
+ new apache::thrift::concurrency::PlatformThreadFactory(false)));
TThreadedServer(
const boost::shared_ptr<apache::thrift::TProcessorFactory>& processorFactory,
@@ -66,7 +64,7 @@
const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& outputProtocolFactory,
const boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>& threadFactory
= boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>(
- new apache::thrift::concurrency::PlatformThreadFactory));
+ new apache::thrift::concurrency::PlatformThreadFactory(false)));
TThreadedServer(
const boost::shared_ptr<apache::thrift::TProcessor>& processor,
@@ -77,7 +75,7 @@
const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& outputProtocolFactory,
const boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>& threadFactory
= boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>(
- new apache::thrift::concurrency::PlatformThreadFactory));
+ new apache::thrift::concurrency::PlatformThreadFactory(false)));
virtual ~TThreadedServer();
diff --git a/lib/cpp/test/TPipeInterruptTest.cpp b/lib/cpp/test/TPipeInterruptTest.cpp
index e2f2489..80e4c1f 100644
--- a/lib/cpp/test/TPipeInterruptTest.cpp
+++ b/lib/cpp/test/TPipeInterruptTest.cpp
@@ -63,7 +63,7 @@
}
BOOST_AUTO_TEST_CASE(stress_pipe_accept_interruption) {
- int interruptIters = 100;
+ int interruptIters = 10;
for (int i = 0; i < interruptIters; ++i)
{
diff --git a/lib/cpp/test/TServerIntegrationTest.cpp b/lib/cpp/test/TServerIntegrationTest.cpp
index 2180448..fd7bae2 100644
--- a/lib/cpp/test/TServerIntegrationTest.cpp
+++ b/lib/cpp/test/TServerIntegrationTest.cpp
@@ -156,9 +156,9 @@
boost::shared_ptr<TTransportFactory>(new TTransportFactory),
boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory))),
pEventHandler(boost::shared_ptr<TServerReadyEventHandler>(new TServerReadyEventHandler)),
- bStressDone(false),
- bStressConnectionCount(0),
- bStressRequestCount(0) {
+ bStressDone(false),
+ bStressConnectionCount(0),
+ bStressRequestCount(0) {
pServer->setServerEventHandler(pEventHandler);
}
@@ -170,8 +170,8 @@
boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory))),
pEventHandler(boost::shared_ptr<TServerReadyEventHandler>(new TServerReadyEventHandler)),
bStressDone(false),
- bStressConnectionCount(0),
- bStressRequestCount(0) {
+ bStressConnectionCount(0),
+ bStressRequestCount(0) {
pServer->setServerEventHandler(pEventHandler);
}
@@ -217,10 +217,10 @@
* \param[in] purpose a description of the test for logging purposes
*/
void baseline(int64_t numToMake, int64_t expectedHWM, const std::string& purpose) {
- BOOST_TEST_MESSAGE(boost::format("Testing %1%: %2% with %3% clients, expect %4% HWM")
- % typeid(TServerType).name() % purpose % numToMake % expectedHWM);
+ BOOST_TEST_MESSAGE(boost::format("Testing %1%: %2% with %3% clients, expect %4% HWM")
+ % typeid(TServerType).name() % purpose % numToMake % expectedHWM);
- startServer();
+ startServer();
std::vector<boost::shared_ptr<TSocket> > holdSockets;
std::vector<boost::shared_ptr<boost::thread> > holdThreads;
@@ -303,14 +303,14 @@
* Helper method to stress the system
*/
void stressor() {
- while (!bStressDone) {
+ while (!bStressDone) {
boost::shared_ptr<TSocket> pSocket(new TSocket("localhost", getServerPort()), autoSocketCloser);
boost::shared_ptr<TProtocol> pProtocol(new TBinaryProtocol(pSocket));
ParentServiceClient client(pProtocol);
pSocket->open();
bStressConnectionCount.fetch_add(1, boost::memory_order_relaxed);
for (int i = 0; i < rand() % 1000; ++i) {
- client.incrementGeneration();
+ client.incrementGeneration();
bStressRequestCount.fetch_add(1, boost::memory_order_relaxed);
}
}
@@ -459,7 +459,7 @@
BOOST_AUTO_TEST_CASE(test_stop_with_uninterruptable_clients_connected) {
// This tests pre-THRIFT-2441 behavior: stopping the server blocks until clients
// disconnect.
- BOOST_TEST_MESSAGE("Testing stop with uninterruptable clients");
+ BOOST_TEST_MESSAGE("Testing stop with uninterruptable clients");
boost::dynamic_pointer_cast<TServerSocket>(pServer->getServerTransport())
->setInterruptableChildren(false); // returns to pre-THRIFT-2441 behavior
diff --git a/lib/cpp/test/concurrency/Tests.cpp b/lib/cpp/test/concurrency/Tests.cpp
index 0d81d7e..33af392 100644
--- a/lib/cpp/test/concurrency/Tests.cpp
+++ b/lib/cpp/test/concurrency/Tests.cpp
@@ -45,25 +45,38 @@
std::cout << "ThreadFactory tests..." << std::endl;
- size_t count = 1000;
- size_t floodLoops = 1;
- size_t floodCount = 100000;
+ int reapLoops = 20;
+ int reapCount = 1000;
+ size_t floodLoops = 3;
+ size_t floodCount = 20000;
- std::cout << "\t\tThreadFactory reap N threads test: N = " << count << std::endl;
+ std::cout << "\t\tThreadFactory reap N threads test: N = " << reapLoops << "x" << reapCount << std::endl;
- assert(threadFactoryTests.reapNThreads(count));
+ if (!threadFactoryTests.reapNThreads(reapLoops, reapCount)) {
+ std::cerr << "\t\ttThreadFactory reap N threads FAILED" << std::endl;
+ return 1;
+ }
- std::cout << "\t\tThreadFactory floodN threads test: N = " << floodCount << std::endl;
+ std::cout << "\t\tThreadFactory flood N threads test: N = " << floodLoops << "x" << floodCount << std::endl;
- assert(threadFactoryTests.floodNTest(floodLoops, floodCount));
+ if (!threadFactoryTests.floodNTest(floodLoops, floodCount)) {
+ std::cerr << "\t\ttThreadFactory flood N threads FAILED" << std::endl;
+ return 1;
+ }
std::cout << "\t\tThreadFactory synchronous start test" << std::endl;
- assert(threadFactoryTests.synchStartTest());
+ if (!threadFactoryTests.synchStartTest()) {
+ std::cerr << "\t\ttThreadFactory synchronous start FAILED" << std::endl;
+ return 1;
+ }
std::cout << "\t\tThreadFactory monitor timeout test" << std::endl;
- assert(threadFactoryTests.monitorTimeoutTest());
+ if (!threadFactoryTests.monitorTimeoutTest()) {
+ std::cerr << "\t\ttThreadFactory monitor timeout FAILED" << std::endl;
+ return 1;
+ }
}
if (runAll || args[0].compare("util") == 0) {
@@ -97,7 +110,10 @@
TimerManagerTests timerManagerTests;
- assert(timerManagerTests.test00());
+ if (!timerManagerTests.test00()) {
+ std::cerr << "\t\tTimerManager tests FAILED" << std::endl;
+ return 1;
+ }
}
if (runAll || args[0].compare("thread-manager") == 0) {
@@ -105,24 +121,34 @@
std::cout << "ThreadManager tests..." << std::endl;
{
-
size_t workerCount = 100;
-
- size_t taskCount = 100000;
-
+ size_t taskCount = 50000;
int64_t delay = 10LL;
+ ThreadManagerTests threadManagerTests;
+
+ std::cout << "\t\tThreadManager api test:" << std::endl;
+
+ if (!threadManagerTests.apiTest()) {
+ std::cerr << "\t\tThreadManager apiTest FAILED" << std::endl;
+ return 1;
+ }
+
std::cout << "\t\tThreadManager load test: worker count: " << workerCount
<< " task count: " << taskCount << " delay: " << delay << std::endl;
- ThreadManagerTests threadManagerTests;
-
- assert(threadManagerTests.loadTest(taskCount, delay, workerCount));
+ if (!threadManagerTests.loadTest(taskCount, delay, workerCount)) {
+ std::cerr << "\t\tThreadManager loadTest FAILED" << std::endl;
+ return 1;
+ }
std::cout << "\t\tThreadManager block test: worker count: " << workerCount
<< " delay: " << delay << std::endl;
- assert(threadManagerTests.blockTest(delay, workerCount));
+ if (!threadManagerTests.blockTest(delay, workerCount)) {
+ std::cerr << "\t\tThreadManager blockTest FAILED" << std::endl;
+ return 1;
+ }
}
}
@@ -134,13 +160,13 @@
size_t minWorkerCount = 2;
- size_t maxWorkerCount = 512;
+ size_t maxWorkerCount = 64;
size_t tasksPerWorker = 1000;
- int64_t delay = 10LL;
+ int64_t delay = 5LL;
- for (size_t workerCount = minWorkerCount; workerCount < maxWorkerCount; workerCount *= 2) {
+ for (size_t workerCount = minWorkerCount; workerCount < maxWorkerCount; workerCount *= 4) {
size_t taskCount = workerCount * tasksPerWorker;
@@ -149,8 +175,15 @@
ThreadManagerTests threadManagerTests;
- threadManagerTests.loadTest(taskCount, delay, workerCount);
+ if (!threadManagerTests.loadTest(taskCount, delay, workerCount))
+ {
+ std::cerr << "\t\tThreadManager loadTest FAILED" << std::endl;
+ return 1;
+ }
}
}
}
+
+ std::cout << "ALL TESTS PASSED" << std::endl;
+ return 0;
}
diff --git a/lib/cpp/test/concurrency/ThreadFactoryTests.h b/lib/cpp/test/concurrency/ThreadFactoryTests.h
index 3ad14ca..4fc688c 100644
--- a/lib/cpp/test/concurrency/ThreadFactoryTests.h
+++ b/lib/cpp/test/concurrency/ThreadFactoryTests.h
@@ -43,36 +43,6 @@
class ThreadFactoryTests {
public:
- static const double TEST_TOLERANCE;
-
- class Task : public Runnable {
-
- public:
- Task() {}
-
- void run() { std::cout << "\t\t\tHello World" << std::endl; }
- };
-
- /**
- * Hello world test
- */
- bool helloWorldTest() {
-
- PlatformThreadFactory threadFactory = PlatformThreadFactory();
-
- shared_ptr<Task> task = shared_ptr<Task>(new ThreadFactoryTests::Task());
-
- shared_ptr<Thread> thread = threadFactory.newThread(task);
-
- thread->start();
-
- thread->join();
-
- std::cout << "\t\t\tSuccess!" << std::endl;
-
- return true;
- }
-
/**
* Reap N threads
*/
@@ -244,15 +214,22 @@
return true;
}
- /** See how accurate monitor timeout is. */
+ /**
+ * The only guarantee a monitor timeout can give you is that
+ * it will take "at least" as long as the timeout, no less.
+ * There is absolutely no guarantee around regaining execution
+ * near the timeout. On a busy system (like inside a third party
+ * CI environment) it could take quite a bit longer than the
+ * requested timeout, and that's ok.
+ */
- bool monitorTimeoutTest(size_t count = 1000, int64_t timeout = 10) {
+ bool monitorTimeoutTest(int64_t count = 1000, int64_t timeout = 2) {
Monitor monitor;
int64_t startTime = Util::currentTime();
- for (size_t ix = 0; ix < count; ix++) {
+ for (int64_t ix = 0; ix < count; ix++) {
{
Synchronized s(monitor);
try {
@@ -264,18 +241,11 @@
int64_t endTime = Util::currentTime();
- double error = ((endTime - startTime) - (count * timeout)) / (double)(count * timeout);
-
- if (error < 0.0) {
-
- error *= 1.0;
- }
-
- bool success = error < ThreadFactoryTests::TEST_TOLERANCE;
+ bool success = (endTime - startTime) >= (count * timeout);
std::cout << "\t\t\t" << (success ? "Success" : "Failure")
- << "! expected time: " << count * timeout
- << "ms elapsed time: " << endTime - startTime << "ms error%: " << error * 100.0
+ << ": minimum required time to elapse " << count * timeout
+ << "ms; actual elapsed time " << endTime - startTime << "ms"
<< std::endl;
return success;
@@ -285,17 +255,15 @@
public:
FloodTask(const size_t id) : _id(id) {}
~FloodTask() {
- if (_id % 1000 == 0) {
+ if (_id % 10000 == 0) {
std::cout << "\t\tthread " << _id << " done" << std::endl;
}
}
void run() {
- if (_id % 1000 == 0) {
+ if (_id % 10000 == 0) {
std::cout << "\t\tthread " << _id << " started" << std::endl;
}
-
- THRIFT_SLEEP_USEC(1);
}
const size_t _id;
};
@@ -321,8 +289,6 @@
thread->start();
- THRIFT_SLEEP_USEC(1);
-
} catch (TException& e) {
std::cout << "\t\t\tfailed to start " << lix* count + tix << " thread " << e.what()
@@ -341,7 +307,6 @@
}
};
-const double ThreadFactoryTests::TEST_TOLERANCE = .20;
}
}
}
diff --git a/lib/cpp/test/concurrency/ThreadManagerTests.h b/lib/cpp/test/concurrency/ThreadManagerTests.h
index b196813..b5925ac 100644
--- a/lib/cpp/test/concurrency/ThreadManagerTests.h
+++ b/lib/cpp/test/concurrency/ThreadManagerTests.h
@@ -24,9 +24,9 @@
#include <thrift/concurrency/Util.h>
#include <assert.h>
+#include <deque>
#include <set>
#include <iostream>
-#include <set>
#include <stdint.h>
namespace apache {
@@ -36,9 +36,26 @@
using namespace apache::thrift::concurrency;
-class ThreadManagerTests {
+static std::deque<boost::shared_ptr<Runnable> > m_expired;
+static void expiredNotifier(boost::shared_ptr<Runnable> runnable)
+{
+ m_expired.push_back(runnable);
+}
- static const double TEST_TOLERANCE;
+static void sleep_(int64_t millisec) {
+ Monitor _sleep;
+ Synchronized s(_sleep);
+
+ try {
+ _sleep.wait(millisec);
+ } catch (TimedOutException&) {
+ ;
+ } catch (...) {
+ assert(0);
+ }
+}
+
+class ThreadManagerTests {
public:
class Task : public Runnable {
@@ -51,17 +68,7 @@
_startTime = Util::currentTime();
- {
- Synchronized s(_sleep);
-
- try {
- _sleep.wait(_timeout);
- } catch (TimedOutException&) {
- ;
- } catch (...) {
- assert(0);
- }
- }
+ sleep_(_timeout);
_endTime = Util::currentTime();
@@ -73,9 +80,7 @@
// std::cout << "Thread " << _count << " completed " << std::endl;
_count--;
-
- if (_count == 0) {
-
+ if (_count % 10000 == 0) {
_monitor.notify();
}
}
@@ -130,11 +135,13 @@
threadManager->add(*ix);
}
+ std::cout << "\t\t\t\tloaded " << count << " tasks to execute" << std::endl;
+
{
Synchronized s(monitor);
while (activeCount > 0) {
-
+ std::cout << "\t\t\t\tactiveCount = " << activeCount << std::endl;
monitor.wait();
}
}
@@ -179,23 +186,15 @@
averageTime /= count;
- std::cout << "\t\t\tfirst start: " << firstTime << "ms Last end: " << lastTime
- << "ms min: " << minTime << "ms max: " << maxTime << "ms average: " << averageTime
+ std::cout << "\t\t\tfirst start: " << firstTime << " Last end: " << lastTime
+ << " min: " << minTime << "ms max: " << maxTime << "ms average: " << averageTime
<< "ms" << std::endl;
- double expectedTime = (double(count + (workerCount - 1)) / workerCount) * timeout;
-
- double error = ((time01 - time00) - expectedTime) / expectedTime;
-
- if (error < 0) {
- error *= -1.0;
- }
-
- bool success = error < TEST_TOLERANCE;
+ bool success = (time01 - time00) >= ((int64_t)count * timeout) / (int64_t)workerCount;
std::cout << "\t\t\t" << (success ? "Success" : "Failure")
- << "! expected time: " << expectedTime << "ms elapsed time: " << time01 - time00
- << "ms error%: " << error * 100.0 << std::endl;
+ << "! expected time: " << ((int64_t)count * timeout) / (int64_t)workerCount << "ms elapsed time: " << time01 - time00
+ << "ms" << std::endl;
return success;
}
@@ -203,30 +202,36 @@
class BlockTask : public Runnable {
public:
- BlockTask(Monitor& monitor, Monitor& bmonitor, size_t& count)
- : _monitor(monitor), _bmonitor(bmonitor), _count(count) {}
+ BlockTask(Monitor& entryMonitor, Monitor& blockMonitor, bool& blocked, Monitor& doneMonitor, size_t& count)
+ : _entryMonitor(entryMonitor), _entered(false), _blockMonitor(blockMonitor), _blocked(blocked), _doneMonitor(doneMonitor), _count(count) {}
void run() {
{
- Synchronized s(_bmonitor);
-
- _bmonitor.wait();
+ Synchronized s(_entryMonitor);
+ _entered = true;
+ _entryMonitor.notify();
}
{
- Synchronized s(_monitor);
+ Synchronized s(_blockMonitor);
+ while (_blocked) {
+ _blockMonitor.wait();
+ }
+ }
- _count--;
-
- if (_count == 0) {
-
- _monitor.notify();
+ {
+ Synchronized s(_doneMonitor);
+ if (--_count == 0) {
+ _doneMonitor.notify();
}
}
}
- Monitor& _monitor;
- Monitor& _bmonitor;
+ Monitor& _entryMonitor;
+ bool _entered;
+ Monitor& _blockMonitor;
+ bool& _blocked;
+ Monitor& _doneMonitor;
size_t& _count;
};
@@ -240,8 +245,10 @@
try {
- Monitor bmonitor;
- Monitor monitor;
+ Monitor entryMonitor; // not used by this test
+ Monitor blockMonitor;
+ bool blocked[] = {true, true, true};
+ Monitor doneMonitor;
size_t pendingTaskMaxCount = workerCount;
@@ -260,21 +267,22 @@
threadManager->start();
- std::set<shared_ptr<ThreadManagerTests::BlockTask> > tasks;
+ std::vector<shared_ptr<ThreadManagerTests::BlockTask> > tasks;
+ tasks.reserve(workerCount + pendingTaskMaxCount);
for (size_t ix = 0; ix < workerCount; ix++) {
- tasks.insert(shared_ptr<ThreadManagerTests::BlockTask>(
- new ThreadManagerTests::BlockTask(monitor, bmonitor, activeCounts[0])));
+ tasks.push_back(shared_ptr<ThreadManagerTests::BlockTask>(
+ new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked[0], doneMonitor, activeCounts[0])));
}
for (size_t ix = 0; ix < pendingTaskMaxCount; ix++) {
- tasks.insert(shared_ptr<ThreadManagerTests::BlockTask>(
- new ThreadManagerTests::BlockTask(monitor, bmonitor, activeCounts[1])));
+ tasks.push_back(shared_ptr<ThreadManagerTests::BlockTask>(
+ new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked[1], doneMonitor, activeCounts[1])));
}
- for (std::set<shared_ptr<ThreadManagerTests::BlockTask> >::iterator ix = tasks.begin();
+ for (std::vector<shared_ptr<ThreadManagerTests::BlockTask> >::iterator ix = tasks.begin();
ix != tasks.end();
ix++) {
threadManager->add(*ix);
@@ -285,7 +293,7 @@
}
shared_ptr<ThreadManagerTests::BlockTask> extraTask(
- new ThreadManagerTests::BlockTask(monitor, bmonitor, activeCounts[2]));
+ new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked[2], doneMonitor, activeCounts[2]));
try {
threadManager->add(extraTask, 1);
@@ -309,16 +317,15 @@
<< "Pending tasks " << threadManager->pendingTaskCount() << std::endl;
{
- Synchronized s(bmonitor);
-
- bmonitor.notifyAll();
+ Synchronized s(blockMonitor);
+ blocked[0] = false;
+ blockMonitor.notifyAll();
}
{
- Synchronized s(monitor);
-
+ Synchronized s(doneMonitor);
while (activeCounts[0] != 0) {
- monitor.wait();
+ doneMonitor.wait();
}
}
@@ -341,37 +348,37 @@
// Wake up tasks that were pending before and wait for them to complete
{
- Synchronized s(bmonitor);
-
- bmonitor.notifyAll();
+ Synchronized s(blockMonitor);
+ blocked[1] = false;
+ blockMonitor.notifyAll();
}
{
- Synchronized s(monitor);
-
+ Synchronized s(doneMonitor);
while (activeCounts[1] != 0) {
- monitor.wait();
+ doneMonitor.wait();
}
}
// Wake up the extra task and wait for it to complete
{
- Synchronized s(bmonitor);
-
- bmonitor.notifyAll();
+ Synchronized s(blockMonitor);
+ blocked[2] = false;
+ blockMonitor.notifyAll();
}
{
- Synchronized s(monitor);
-
+ Synchronized s(doneMonitor);
while (activeCounts[2] != 0) {
- monitor.wait();
+ doneMonitor.wait();
}
}
+ threadManager->stop();
+
if (!(success = (threadManager->totalTaskCount() == 0))) {
- throw TException("Unexpected pending task count");
+ throw TException("Unexpected total task count");
}
} catch (TException& e) {
@@ -381,9 +388,295 @@
std::cout << "\t\t\t" << (success ? "Success" : "Failure") << std::endl;
return success;
}
+
+
+ bool apiTest() {
+
+ // prove currentTime has milliseconds granularity since many other things depend on it
+ int64_t a = Util::currentTime();
+ sleep_(100);
+ int64_t b = Util::currentTime();
+ if (b - a < 50 || b - a > 150) {
+ std::cerr << "\t\t\texpected 100ms gap, found " << (b-a) << "ms gap instead." << std::endl;
+ return false;
+ }
+
+#if !USE_BOOST_THREAD && !USE_STD_THREAD
+ // test once with a detached thread factory and once with a joinable thread factory
+
+ shared_ptr<PosixThreadFactory> threadFactory
+ = shared_ptr<PosixThreadFactory>(new PosixThreadFactory(false));
+
+ std::cout << "\t\t\tapiTest with joinable thread factory" << std::endl;
+ if (!apiTestWithThreadFactory(threadFactory)) {
+ return false;
+ }
+
+ threadFactory.reset(new PosixThreadFactory(true));
+ std::cout << "\t\t\tapiTest with detached thread factory" << std::endl;
+ return apiTestWithThreadFactory(threadFactory);
+#else
+ return apiTestWithThreadFactory(shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory()));
+#endif
+
+ }
+
+ bool apiTestWithThreadFactory(shared_ptr<PlatformThreadFactory> threadFactory)
+ {
+ shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(1);
+ threadManager->threadFactory(threadFactory);
+
+#if !USE_BOOST_THREAD && !USE_STD_THREAD
+ threadFactory->setPriority(PosixThreadFactory::HIGHEST);
+
+ // verify we cannot change the thread factory to one with the opposite detached setting
+ shared_ptr<PlatformThreadFactory> threadFactory2
+ = shared_ptr<PosixThreadFactory>(new PlatformThreadFactory(
+ PosixThreadFactory::ROUND_ROBIN,
+ PosixThreadFactory::NORMAL,
+ 1,
+ !threadFactory->isDetached()));
+ try {
+ threadManager->threadFactory(threadFactory2);
+ // if the call succeeded we changed the thread factory to one that had the opposite setting for "isDetached()".
+ // this is bad, because the thread manager checks with the thread factory to see if it should join threads
+ // as they are leaving - so the detached status of new threads cannot change while there are existing threads.
+ std::cerr << "\t\t\tShould not be able to change thread factory detached disposition" << std::endl;
+ return false;
+ }
+ catch (InvalidArgumentException& ex) {
+ /* expected */
+ }
+#endif
+
+ std::cout << "\t\t\t\tstarting.. " << std::endl;
+
+ threadManager->start();
+ threadManager->setExpireCallback(expiredNotifier); // apache::thrift::stdcxx::bind(&ThreadManagerTests::expiredNotifier, this));
+
+#define EXPECT(FUNC, COUNT) { size_t c = FUNC; if (c != COUNT) { std::cerr << "expected " #FUNC" to be " #COUNT ", but was " << c << std::endl; return false; } }
+
+ EXPECT(threadManager->workerCount(), 1);
+ EXPECT(threadManager->idleWorkerCount(), 1);
+ EXPECT(threadManager->pendingTaskCount(), 0);
+
+ std::cout << "\t\t\t\tadd 2nd worker.. " << std::endl;
+
+ threadManager->addWorker();
+
+ EXPECT(threadManager->workerCount(), 2);
+ EXPECT(threadManager->idleWorkerCount(), 2);
+ EXPECT(threadManager->pendingTaskCount(), 0);
+
+ std::cout << "\t\t\t\tremove 2nd worker.. " << std::endl;
+
+ threadManager->removeWorker();
+
+ EXPECT(threadManager->workerCount(), 1);
+ EXPECT(threadManager->idleWorkerCount(), 1);
+ EXPECT(threadManager->pendingTaskCount(), 0);
+
+ std::cout << "\t\t\t\tremove 1st worker.. " << std::endl;
+
+ threadManager->removeWorker();
+
+ EXPECT(threadManager->workerCount(), 0);
+ EXPECT(threadManager->idleWorkerCount(), 0);
+ EXPECT(threadManager->pendingTaskCount(), 0);
+
+ std::cout << "\t\t\t\tadd blocking task.. " << std::endl;
+
+ // We're going to throw a blocking task into the mix
+ Monitor entryMonitor; // signaled when task is running
+ Monitor blockMonitor; // to be signaled to unblock the task
+ bool blocked(true); // set to false before notifying
+ Monitor doneMonitor; // signaled when count reaches zero
+ size_t activeCount = 1;
+ shared_ptr<ThreadManagerTests::BlockTask> blockingTask(
+ new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked, doneMonitor, activeCount));
+ threadManager->add(blockingTask);
+
+ EXPECT(threadManager->workerCount(), 0);
+ EXPECT(threadManager->idleWorkerCount(), 0);
+ EXPECT(threadManager->pendingTaskCount(), 1);
+
+ std::cout << "\t\t\t\tadd other task.. " << std::endl;
+
+ shared_ptr<ThreadManagerTests::Task> otherTask(
+ new ThreadManagerTests::Task(doneMonitor, activeCount, 0));
+
+ threadManager->add(otherTask);
+
+ EXPECT(threadManager->workerCount(), 0);
+ EXPECT(threadManager->idleWorkerCount(), 0);
+ EXPECT(threadManager->pendingTaskCount(), 2);
+
+ std::cout << "\t\t\t\tremove blocking task specifically.. " << std::endl;
+
+ threadManager->remove(blockingTask);
+
+ EXPECT(threadManager->workerCount(), 0);
+ EXPECT(threadManager->idleWorkerCount(), 0);
+ EXPECT(threadManager->pendingTaskCount(), 1);
+
+ std::cout << "\t\t\t\tremove next pending task.." << std::endl;
+
+ shared_ptr<Runnable> nextTask = threadManager->removeNextPending();
+ if (nextTask != otherTask) {
+ std::cerr << "\t\t\t\t\texpected removeNextPending to return otherTask" << std::endl;
+ return false;
+ }
+
+ EXPECT(threadManager->workerCount(), 0);
+ EXPECT(threadManager->idleWorkerCount(), 0);
+ EXPECT(threadManager->pendingTaskCount(), 0);
+
+ std::cout << "\t\t\t\tremove next pending task (none left).." << std::endl;
+
+ nextTask = threadManager->removeNextPending();
+ if (nextTask) {
+ std::cerr << "\t\t\t\t\texpected removeNextPending to return an empty Runnable" << std::endl;
+ return false;
+ }
+
+ std::cout << "\t\t\t\tadd 2 expired tasks and 1 not.." << std::endl;
+
+ shared_ptr<ThreadManagerTests::Task> expiredTask(
+ new ThreadManagerTests::Task(doneMonitor, activeCount, 0));
+
+ threadManager->add(expiredTask, 0, 1);
+ threadManager->add(blockingTask); // add one that hasn't expired to make sure it gets skipped
+ threadManager->add(expiredTask, 0, 1); // add a second expired to ensure removeExpiredTasks removes both
+
+ sleep_(50); // make sure enough time elapses for it to expire - the shortest expiration time is 1 millisecond
+
+ EXPECT(threadManager->workerCount(), 0);
+ EXPECT(threadManager->idleWorkerCount(), 0);
+ EXPECT(threadManager->pendingTaskCount(), 3);
+ EXPECT(threadManager->expiredTaskCount(), 0);
+
+ std::cout << "\t\t\t\tremove expired tasks.." << std::endl;
+
+ if (!m_expired.empty()) {
+ std::cerr << "\t\t\t\t\texpected m_expired to be empty" << std::endl;
+ return false;
+ }
+
+ threadManager->removeExpiredTasks();
+
+ if (m_expired.size() != 2) {
+ std::cerr << "\t\t\t\t\texpected m_expired to be set" << std::endl;
+ return false;
+ }
+
+ if (m_expired.front() != expiredTask) {
+ std::cerr << "\t\t\t\t\texpected m_expired[0] to be the expired task" << std::endl;
+ return false;
+ }
+ m_expired.pop_front();
+
+ if (m_expired.front() != expiredTask) {
+ std::cerr << "\t\t\t\t\texpected m_expired[1] to be the expired task" << std::endl;
+ return false;
+ }
+
+ m_expired.clear();
+
+ threadManager->remove(blockingTask);
+
+ EXPECT(threadManager->workerCount(), 0);
+ EXPECT(threadManager->idleWorkerCount(), 0);
+ EXPECT(threadManager->pendingTaskCount(), 0);
+ EXPECT(threadManager->expiredTaskCount(), 2);
+
+ std::cout << "\t\t\t\tadd expired task (again).." << std::endl;
+
+ threadManager->add(expiredTask, 0, 1); // expires in 1ms
+ sleep_(50); // make sure enough time elapses for it to expire - the shortest expiration time is 1ms
+
+ std::cout << "\t\t\t\tadd worker to consume expired task.." << std::endl;
+
+ threadManager->addWorker();
+ sleep_(100); // make sure it has time to spin up and expire the task
+
+ if (m_expired.empty()) {
+ std::cerr << "\t\t\t\t\texpected m_expired to be set" << std::endl;
+ return false;
+ }
+
+ if (m_expired.front() != expiredTask) {
+ std::cerr << "\t\t\t\t\texpected m_expired to be the expired task" << std::endl;
+ return false;
+ }
+
+ m_expired.clear();
+
+ EXPECT(threadManager->workerCount(), 1);
+ EXPECT(threadManager->idleWorkerCount(), 1);
+ EXPECT(threadManager->pendingTaskCount(), 0);
+ EXPECT(threadManager->expiredTaskCount(), 3);
+
+ std::cout << "\t\t\t\ttry to remove too many workers" << std::endl;
+ try {
+ threadManager->removeWorker(2);
+ std::cerr << "\t\t\t\t\texpected InvalidArgumentException" << std::endl;
+ return false;
+ } catch (const InvalidArgumentException&) {
+ /* expected */
+ }
+
+ std::cout << "\t\t\t\tremove worker.. " << std::endl;
+
+ threadManager->removeWorker();
+
+ EXPECT(threadManager->workerCount(), 0);
+ EXPECT(threadManager->idleWorkerCount(), 0);
+ EXPECT(threadManager->pendingTaskCount(), 0);
+ EXPECT(threadManager->expiredTaskCount(), 3);
+
+ std::cout << "\t\t\t\tadd blocking task.. " << std::endl;
+
+ threadManager->add(blockingTask);
+
+ EXPECT(threadManager->workerCount(), 0);
+ EXPECT(threadManager->idleWorkerCount(), 0);
+ EXPECT(threadManager->pendingTaskCount(), 1);
+
+ std::cout << "\t\t\t\tadd worker.. " << std::endl;
+
+ threadManager->addWorker();
+ {
+ Synchronized s(entryMonitor);
+ while (!blockingTask->_entered) {
+ entryMonitor.wait();
+ }
+ }
+
+ EXPECT(threadManager->workerCount(), 1);
+ EXPECT(threadManager->idleWorkerCount(), 0);
+ EXPECT(threadManager->pendingTaskCount(), 0);
+
+ std::cout << "\t\t\t\tunblock task and remove worker.. " << std::endl;
+
+ {
+ Synchronized s(blockMonitor);
+ blocked = false;
+ blockMonitor.notifyAll();
+ }
+ threadManager->removeWorker();
+
+ EXPECT(threadManager->workerCount(), 0);
+ EXPECT(threadManager->idleWorkerCount(), 0);
+ EXPECT(threadManager->pendingTaskCount(), 0);
+
+ std::cout << "\t\t\t\tcleanup.. " << std::endl;
+
+ blockingTask.reset();
+ threadManager.reset();
+ return true;
+ }
};
-const double ThreadManagerTests::TEST_TOLERANCE = .20;
}
}
}
diff --git a/lib/cpp/test/concurrency/TimerManagerTests.h b/lib/cpp/test/concurrency/TimerManagerTests.h
index c6fa4cf..32d3935 100644
--- a/lib/cpp/test/concurrency/TimerManagerTests.h
+++ b/lib/cpp/test/concurrency/TimerManagerTests.h
@@ -34,8 +34,6 @@
class TimerManagerTests {
- static const double TEST_TOLERANCE;
-
public:
class Task : public Runnable {
public:
@@ -52,25 +50,11 @@
void run() {
_endTime = Util::currentTime();
-
- // Figure out error percentage
-
- int64_t delta = _endTime - _startTime;
-
- delta = delta > _timeout ? delta - _timeout : _timeout - delta;
-
- double error = double(delta) / _timeout;
-
- if (error < TEST_TOLERANCE) {
- _success = true;
- }
-
- _done = true;
-
- std::cout << "\t\t\tTimerManagerTests::Task[" << this << "] done" << std::endl; // debug
+ _success = (_endTime - _startTime) >= _timeout;
{
Synchronized s(_monitor);
+ _done = true;
_monitor.notifyAll();
}
}
@@ -147,7 +131,6 @@
Monitor _monitor;
};
-const double TimerManagerTests::TEST_TOLERANCE = .20;
}
}
}
diff --git a/lib/cpp/test/processor/ProcessorTest.cpp b/lib/cpp/test/processor/ProcessorTest.cpp
index 0066657..a4e984c 100644
--- a/lib/cpp/test/processor/ProcessorTest.cpp
+++ b/lib/cpp/test/processor/ProcessorTest.cpp
@@ -299,7 +299,7 @@
*/
uint32_t checkNewConnEvents(const boost::shared_ptr<EventLog>& log) {
// Check for an ET_CONN_CREATED event
- Event event = log->waitForEvent();
+ Event event = log->waitForEvent(2500);
BOOST_CHECK_EQUAL(EventLog::ET_CONN_CREATED, event.type);
// Some servers call the processContext() hook immediately.