Modified facebook::thrift::concurrency::Monitor.wait:
Throw TimedOutException on wait timeout so caller can distinguish between timeout and event.
Modified facebook::thrift::concurrency::PthreadThread.start:
Throw SystemrResourceException on any pthread_* function call failure rather than asserting 0.
Added facebook::thrift::concurrency::Thread.id() and facebook::thrift::concurrency::ThreadFactory.currentThreadId():
Return thread-id of thread and current thread respectively. Needed for reentrancy tests in ThreadManager
Added facebook::thrift::concurrency::ThreadManager.pendingTaskCountMaxN
Modified facebook::thrift::concurrency::ThreadManager.add():
Now support a maximum pending task count and block if the current pending task count is max.
If timeout is specified for add, TimedOutException is thrown if pending task count doesn't decrease
in the timeout interval. If add() is called by a ThreadManager worker thread and the task cannot
be added, a TooManyPendingTasksException is thrown rather than blocking, since deadlocks can ensue
if worker threads block waiting for works threads to complete tasks.
Reviewed By: mcslee, aditya
Revert Plan: revertible
Test Plan: concurrency/test/ThreadManagerTests.h
run concurrency-test thread-manager
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665120 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/test/ThreadManagerTests.h b/lib/cpp/src/concurrency/test/ThreadManagerTests.h
index 9f04435..89e6843 100644
--- a/lib/cpp/src/concurrency/test/ThreadManagerTests.h
+++ b/lib/cpp/src/concurrency/test/ThreadManagerTests.h
@@ -21,7 +21,7 @@
using namespace facebook::thrift::concurrency;
/**
- * ThreadManagerTests class
+ * ThreadManagerTests class
*
* @author marc
* @version $Id:$
@@ -35,8 +35,8 @@
class Task: public Runnable {
public:
-
- Task(Monitor& monitor, size_t& count, long long timeout) :
+
+ Task(Monitor& monitor, size_t& count, long long timeout) :
_monitor(monitor),
_count(count),
_timeout(timeout),
@@ -49,27 +49,33 @@
{
Synchronized s(_sleep);
- _sleep.wait(_timeout);
+ try {
+ _sleep.wait(_timeout);
+ } catch(TimedOutException& e) {
+ ;
+ }catch(...) {
+ assert(0);
+ }
}
_endTime = Util::currentTime();
_done = true;
-
+
{
Synchronized s(_monitor);
// std::cout << "Thread " << _count << " completed " << std::endl;
-
+
_count--;
if (_count == 0) {
-
+
_monitor.notify();
}
}
}
-
+
Monitor& _monitor;
size_t& _count;
long long _timeout;
@@ -95,11 +101,11 @@
shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
threadFactory->priority(PosixThreadFactory::HIGHEST);
-
+
threadManager->threadFactory(threadFactory);
threadManager->start();
-
+
std::set<shared_ptr<ThreadManagerTests::Task> > tasks;
for (size_t ix = 0; ix < count; ix++) {
@@ -118,7 +124,7 @@
Synchronized s(monitor);
while(activeCount > 0) {
-
+
monitor.wait();
}
}
@@ -133,7 +139,7 @@
long long maxTime = 0;
for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
-
+
shared_ptr<ThreadManagerTests::Task> task = *ix;
long long delta = task->_endTime - task->_startTime;
@@ -158,7 +164,7 @@
averageTime+= delta;
}
-
+
averageTime /= count;
std::cout << "\t\t\tfirst start: " << firstTime << "ms Last end: " << lastTime << "ms min: " << minTime << "ms max: " << maxTime << "ms average: " << averageTime << "ms" << std::endl;
@@ -177,6 +183,167 @@
return success;
}
+
+ class BlockTask: public Runnable {
+
+ public:
+
+ BlockTask(Monitor& monitor, Monitor& bmonitor, size_t& count) :
+ _monitor(monitor),
+ _bmonitor(bmonitor),
+ _count(count) {}
+
+ void run() {
+ {
+ Synchronized s(_bmonitor);
+
+ _bmonitor.wait();
+
+ }
+
+ {
+ Synchronized s(_monitor);
+
+ _count--;
+
+ if (_count == 0) {
+
+ _monitor.notify();
+ }
+ }
+ }
+
+ Monitor& _monitor;
+ Monitor& _bmonitor;
+ size_t& _count;
+ };
+
+ /**
+ * Block test. Create pendingTaskCountMax tasks. Verify that we block adding the
+ * pendingTaskCountMax + 1th task. Verify that we unblock when a task completes */
+
+ bool blockTest(long long timeout=100LL, size_t workerCount=2) {
+
+ bool success = false;
+
+ try {
+
+ Monitor bmonitor;
+ Monitor monitor;
+
+ size_t pendingTaskMaxCount = workerCount;
+
+ size_t activeCounts[] = {workerCount, pendingTaskMaxCount, 1};
+
+ shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount, pendingTaskMaxCount);
+
+ shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
+
+ threadFactory->priority(PosixThreadFactory::HIGHEST);
+
+ threadManager->threadFactory(threadFactory);
+
+ threadManager->start();
+
+ std::set<shared_ptr<ThreadManagerTests::BlockTask> > tasks;
+
+ for (size_t ix = 0; ix < workerCount; ix++) {
+
+ tasks.insert(shared_ptr<ThreadManagerTests::BlockTask>(new ThreadManagerTests::BlockTask(monitor, bmonitor,activeCounts[0])));
+ }
+
+ for (size_t ix = 0; ix < pendingTaskMaxCount; ix++) {
+
+ tasks.insert(shared_ptr<ThreadManagerTests::BlockTask>(new ThreadManagerTests::BlockTask(monitor, bmonitor,activeCounts[1])));
+ }
+
+ for (std::set<shared_ptr<ThreadManagerTests::BlockTask> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
+ threadManager->add(*ix);
+ }
+
+ if(!(success = (threadManager->totalTaskCount() == pendingTaskMaxCount + workerCount))) {
+ throw TException("Unexpected pending task count");
+ }
+
+ shared_ptr<ThreadManagerTests::BlockTask> extraTask(new ThreadManagerTests::BlockTask(monitor, bmonitor, activeCounts[2]));
+
+ try {
+ threadManager->add(extraTask, 1);
+ throw TException("Unexpected success adding task in excess of pending task count");
+ } catch(TimedOutException& e) {
+ }
+
+ std::cout << "\t\t\t" << "Pending tasks " << threadManager->pendingTaskCount() << std::endl;
+
+ {
+ Synchronized s(bmonitor);
+
+ bmonitor.notifyAll();
+ }
+
+ {
+ Synchronized s(monitor);
+
+ while(activeCounts[0] != 0) {
+ monitor.wait();
+ }
+ }
+
+ std::cout << "\t\t\t" << "Pending tasks " << threadManager->pendingTaskCount() << std::endl;
+
+ try {
+ threadManager->add(extraTask, 1);
+ } catch(TimedOutException& e) {
+ std::cout << "\t\t\t" << "add timed out unexpectedly" << std::endl;
+ throw TException("Unexpected timeout adding task");
+
+ } catch(TooManyPendingTasksException& e) {
+ std::cout << "\t\t\t" << "add encountered too many pending exepctions" << std::endl;
+ throw TException("Unexpected timeout adding task");
+ }
+
+ // Wake up tasks that were pending before and wait for them to complete
+
+ {
+ Synchronized s(bmonitor);
+
+ bmonitor.notifyAll();
+ }
+
+ {
+ Synchronized s(monitor);
+
+ while(activeCounts[1] != 0) {
+ monitor.wait();
+ }
+ }
+
+ // Wake up the extra task and wait for it to complete
+
+ {
+ Synchronized s(bmonitor);
+
+ bmonitor.notifyAll();
+ }
+
+ {
+ Synchronized s(monitor);
+
+ while(activeCounts[2] != 0) {
+ monitor.wait();
+ }
+ }
+
+ if(!(success = (threadManager->totalTaskCount() == 0))) {
+ throw TException("Unexpected pending task count");
+ }
+
+ } catch(TException& e) {
+ }
+
+ std::cout << "\t\t\t" << (success ? "Success" : "Failure") << std::endl;
+ return success;
+ }
};
const double ThreadManagerTests::ERROR = .20;