blob: 650f17fd888d42d2157997ea6a1fa242837f0e44 [file] [log] [blame]
Mark Slee9f0c6512007-02-28 23:58:26 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Marc Slemkoc7782972006-07-25 02:26:35 +00007#include <config.h>
Marc Slemko6f038a72006-08-03 18:58:09 +00008#include <concurrency/ThreadManager.h>
9#include <concurrency/PosixThreadFactory.h>
10#include <concurrency/Monitor.h>
11#include <concurrency/Util.h>
Marc Slemko740343d2006-07-20 00:31:02 +000012
13#include <assert.h>
14#include <set>
15#include <iostream>
Marc Slemkoc7782972006-07-25 02:26:35 +000016#include <set>
17#include <stdint.h>
Marc Slemko740343d2006-07-20 00:31:02 +000018
T Jake Lucianib5e62212009-01-31 22:36:20 +000019namespace apache { namespace thrift { namespace concurrency { namespace test {
Marc Slemko740343d2006-07-20 00:31:02 +000020
T Jake Lucianib5e62212009-01-31 22:36:20 +000021using namespace apache::thrift::concurrency;
Marc Slemko740343d2006-07-20 00:31:02 +000022
Mark Sleef5f2be42006-09-05 21:05:31 +000023/**
Marc Slemko3a3b53b2007-05-22 23:59:54 +000024 * ThreadManagerTests class
Mark Sleef5f2be42006-09-05 21:05:31 +000025 *
Mark Sleef5f2be42006-09-05 21:05:31 +000026 * @version $Id:$
27 */
Marc Slemko740343d2006-07-20 00:31:02 +000028class ThreadManagerTests {
29
30public:
31
Marc Slemko6f038a72006-08-03 18:58:09 +000032 static const double ERROR;
33
Marc Slemko740343d2006-07-20 00:31:02 +000034 class Task: public Runnable {
35
36 public:
Marc Slemko3a3b53b2007-05-22 23:59:54 +000037
Mark Slee9b82d272007-05-23 05:16:07 +000038 Task(Monitor& monitor, size_t& count, int64_t timeout) :
Marc Slemko740343d2006-07-20 00:31:02 +000039 _monitor(monitor),
40 _count(count),
41 _timeout(timeout),
Marc Slemko740343d2006-07-20 00:31:02 +000042 _done(false) {}
43
44 void run() {
45
Marc Slemkoc7782972006-07-25 02:26:35 +000046 _startTime = Util::currentTime();
Marc Slemko740343d2006-07-20 00:31:02 +000047
Mark Sleef5f2be42006-09-05 21:05:31 +000048 {
49 Synchronized s(_sleep);
Marc Slemko740343d2006-07-20 00:31:02 +000050
Marc Slemko3a3b53b2007-05-22 23:59:54 +000051 try {
52 _sleep.wait(_timeout);
53 } catch(TimedOutException& e) {
54 ;
55 }catch(...) {
56 assert(0);
57 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000058 }
Marc Slemko740343d2006-07-20 00:31:02 +000059
Marc Slemkoc7782972006-07-25 02:26:35 +000060 _endTime = Util::currentTime();
61
Marc Slemko740343d2006-07-20 00:31:02 +000062 _done = true;
Marc Slemko3a3b53b2007-05-22 23:59:54 +000063
Mark Sleef5f2be42006-09-05 21:05:31 +000064 {
65 Synchronized s(_monitor);
Marc Slemko740343d2006-07-20 00:31:02 +000066
David Reiss96d23882007-07-26 21:10:32 +000067 // std::cout << "Thread " << _count << " completed " << std::endl;
Marc Slemko3a3b53b2007-05-22 23:59:54 +000068
David Reiss96d23882007-07-26 21:10:32 +000069 _count--;
Marc Slemko740343d2006-07-20 00:31:02 +000070
David Reiss96d23882007-07-26 21:10:32 +000071 if (_count == 0) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +000072
David Reiss96d23882007-07-26 21:10:32 +000073 _monitor.notify();
74 }
Marc Slemko740343d2006-07-20 00:31:02 +000075 }
76 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +000077
Marc Slemko740343d2006-07-20 00:31:02 +000078 Monitor& _monitor;
79 size_t& _count;
Mark Slee9b82d272007-05-23 05:16:07 +000080 int64_t _timeout;
81 int64_t _startTime;
82 int64_t _endTime;
Marc Slemko740343d2006-07-20 00:31:02 +000083 bool _done;
Marc Slemkoc7782972006-07-25 02:26:35 +000084 Monitor _sleep;
Marc Slemko740343d2006-07-20 00:31:02 +000085 };
86
Mark Sleef5f2be42006-09-05 21:05:31 +000087 /**
88 * Dispatch count tasks, each of which blocks for timeout milliseconds then
89 * completes. Verify that all tasks completed and that thread manager cleans
90 * up properly on delete.
91 */
Mark Slee9b82d272007-05-23 05:16:07 +000092 bool loadTest(size_t count=100, int64_t timeout=100LL, size_t workerCount=4) {
Marc Slemko740343d2006-07-20 00:31:02 +000093
94 Monitor monitor;
95
96 size_t activeCount = count;
97
Marc Slemko6f038a72006-08-03 18:58:09 +000098 shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount);
Marc Slemkoc7782972006-07-25 02:26:35 +000099
Marc Slemko6f038a72006-08-03 18:58:09 +0000100 shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
Marc Slemkoc7782972006-07-25 02:26:35 +0000101
Marc Slemkoa6479032007-06-05 22:20:14 +0000102 threadFactory->setPriority(PosixThreadFactory::HIGHEST);
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000103
Marc Slemkoc7782972006-07-25 02:26:35 +0000104 threadManager->threadFactory(threadFactory);
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000105
106 threadManager->start();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000107
Marc Slemko6f038a72006-08-03 18:58:09 +0000108 std::set<shared_ptr<ThreadManagerTests::Task> > tasks;
Marc Slemko740343d2006-07-20 00:31:02 +0000109
Mark Sleef5f2be42006-09-05 21:05:31 +0000110 for (size_t ix = 0; ix < count; ix++) {
Marc Slemko740343d2006-07-20 00:31:02 +0000111
Marc Slemko6f038a72006-08-03 18:58:09 +0000112 tasks.insert(shared_ptr<ThreadManagerTests::Task>(new ThreadManagerTests::Task(monitor, activeCount, timeout)));
Marc Slemko740343d2006-07-20 00:31:02 +0000113 }
114
Mark Slee9b82d272007-05-23 05:16:07 +0000115 int64_t time00 = Util::currentTime();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000116
Mark Sleef5f2be42006-09-05 21:05:31 +0000117 for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
Marc Slemko740343d2006-07-20 00:31:02 +0000118
David Reiss96d23882007-07-26 21:10:32 +0000119 threadManager->add(*ix);
Marc Slemko740343d2006-07-20 00:31:02 +0000120 }
121
Mark Sleef5f2be42006-09-05 21:05:31 +0000122 {
123 Synchronized s(monitor);
124
Marc Slemko740343d2006-07-20 00:31:02 +0000125 while(activeCount > 0) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000126
David Reiss96d23882007-07-26 21:10:32 +0000127 monitor.wait();
Marc Slemko740343d2006-07-20 00:31:02 +0000128 }
129 }
130
Mark Slee9b82d272007-05-23 05:16:07 +0000131 int64_t time01 = Util::currentTime();
Marc Slemko740343d2006-07-20 00:31:02 +0000132
Mark Slee9b82d272007-05-23 05:16:07 +0000133 int64_t firstTime = 9223372036854775807LL;
134 int64_t lastTime = 0;
Marc Slemkoc7782972006-07-25 02:26:35 +0000135
136 double averageTime = 0;
Mark Slee9b82d272007-05-23 05:16:07 +0000137 int64_t minTime = 9223372036854775807LL;
138 int64_t maxTime = 0;
Marc Slemkoc7782972006-07-25 02:26:35 +0000139
Mark Sleef5f2be42006-09-05 21:05:31 +0000140 for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000141
Marc Slemko6f038a72006-08-03 18:58:09 +0000142 shared_ptr<ThreadManagerTests::Task> task = *ix;
Marc Slemkoc7782972006-07-25 02:26:35 +0000143
Mark Slee9b82d272007-05-23 05:16:07 +0000144 int64_t delta = task->_endTime - task->_startTime;
Marc Slemkoc7782972006-07-25 02:26:35 +0000145
146 assert(delta > 0);
147
Mark Sleef5f2be42006-09-05 21:05:31 +0000148 if (task->_startTime < firstTime) {
David Reiss96d23882007-07-26 21:10:32 +0000149 firstTime = task->_startTime;
Marc Slemkoc7782972006-07-25 02:26:35 +0000150 }
151
Mark Sleef5f2be42006-09-05 21:05:31 +0000152 if (task->_endTime > lastTime) {
David Reiss96d23882007-07-26 21:10:32 +0000153 lastTime = task->_endTime;
Marc Slemkoc7782972006-07-25 02:26:35 +0000154 }
155
Mark Sleef5f2be42006-09-05 21:05:31 +0000156 if (delta < minTime) {
David Reiss96d23882007-07-26 21:10:32 +0000157 minTime = delta;
Marc Slemkoc7782972006-07-25 02:26:35 +0000158 }
159
Mark Sleef5f2be42006-09-05 21:05:31 +0000160 if (delta > maxTime) {
David Reiss96d23882007-07-26 21:10:32 +0000161 maxTime = delta;
Marc Slemkoc7782972006-07-25 02:26:35 +0000162 }
163
164 averageTime+= delta;
Marc Slemko740343d2006-07-20 00:31:02 +0000165 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000166
Marc Slemkoc7782972006-07-25 02:26:35 +0000167 averageTime /= count;
168
169 std::cout << "\t\t\tfirst start: " << firstTime << "ms Last end: " << lastTime << "ms min: " << minTime << "ms max: " << maxTime << "ms average: " << averageTime << "ms" << std::endl;
Marc Slemko740343d2006-07-20 00:31:02 +0000170
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000171 double expectedTime = ((count + (workerCount - 1)) / workerCount) * timeout;
172
173 double error = ((time01 - time00) - expectedTime) / expectedTime;
174
Mark Sleef5f2be42006-09-05 21:05:31 +0000175 if (error < 0) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000176 error*= -1.0;
177 }
178
Marc Slemko6f038a72006-08-03 18:58:09 +0000179 bool success = error < ERROR;
Marc Slemkoc7782972006-07-25 02:26:35 +0000180
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000181 std::cout << "\t\t\t" << (success ? "Success" : "Failure") << "! expected time: " << expectedTime << "ms elapsed time: "<< time01 - time00 << "ms error%: " << error * 100.0 << std::endl;
Marc Slemko740343d2006-07-20 00:31:02 +0000182
Marc Slemkoc7782972006-07-25 02:26:35 +0000183 return success;
Marc Slemko740343d2006-07-20 00:31:02 +0000184 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000185
186 class BlockTask: public Runnable {
187
188 public:
189
190 BlockTask(Monitor& monitor, Monitor& bmonitor, size_t& count) :
191 _monitor(monitor),
192 _bmonitor(bmonitor),
193 _count(count) {}
194
195 void run() {
196 {
197 Synchronized s(_bmonitor);
198
199 _bmonitor.wait();
200
201 }
202
203 {
204 Synchronized s(_monitor);
205
206 _count--;
207
208 if (_count == 0) {
209
210 _monitor.notify();
211 }
212 }
213 }
214
215 Monitor& _monitor;
216 Monitor& _bmonitor;
217 size_t& _count;
218 };
219
220 /**
221 * Block test. Create pendingTaskCountMax tasks. Verify that we block adding the
222 * pendingTaskCountMax + 1th task. Verify that we unblock when a task completes */
223
Mark Slee9b82d272007-05-23 05:16:07 +0000224 bool blockTest(int64_t timeout=100LL, size_t workerCount=2) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000225
226 bool success = false;
227
228 try {
229
230 Monitor bmonitor;
231 Monitor monitor;
232
233 size_t pendingTaskMaxCount = workerCount;
234
235 size_t activeCounts[] = {workerCount, pendingTaskMaxCount, 1};
236
237 shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount, pendingTaskMaxCount);
238
239 shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
240
Marc Slemkoa6479032007-06-05 22:20:14 +0000241 threadFactory->setPriority(PosixThreadFactory::HIGHEST);
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000242
243 threadManager->threadFactory(threadFactory);
244
245 threadManager->start();
246
247 std::set<shared_ptr<ThreadManagerTests::BlockTask> > tasks;
248
249 for (size_t ix = 0; ix < workerCount; ix++) {
250
251 tasks.insert(shared_ptr<ThreadManagerTests::BlockTask>(new ThreadManagerTests::BlockTask(monitor, bmonitor,activeCounts[0])));
252 }
253
254 for (size_t ix = 0; ix < pendingTaskMaxCount; ix++) {
255
256 tasks.insert(shared_ptr<ThreadManagerTests::BlockTask>(new ThreadManagerTests::BlockTask(monitor, bmonitor,activeCounts[1])));
257 }
258
259 for (std::set<shared_ptr<ThreadManagerTests::BlockTask> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
David Reiss96d23882007-07-26 21:10:32 +0000260 threadManager->add(*ix);
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000261 }
262
263 if(!(success = (threadManager->totalTaskCount() == pendingTaskMaxCount + workerCount))) {
264 throw TException("Unexpected pending task count");
265 }
266
267 shared_ptr<ThreadManagerTests::BlockTask> extraTask(new ThreadManagerTests::BlockTask(monitor, bmonitor, activeCounts[2]));
268
269 try {
270 threadManager->add(extraTask, 1);
271 throw TException("Unexpected success adding task in excess of pending task count");
272 } catch(TimedOutException& e) {
273 }
274
275 std::cout << "\t\t\t" << "Pending tasks " << threadManager->pendingTaskCount() << std::endl;
276
277 {
278 Synchronized s(bmonitor);
279
280 bmonitor.notifyAll();
281 }
282
283 {
284 Synchronized s(monitor);
285
286 while(activeCounts[0] != 0) {
287 monitor.wait();
288 }
289 }
290
291 std::cout << "\t\t\t" << "Pending tasks " << threadManager->pendingTaskCount() << std::endl;
292
293 try {
294 threadManager->add(extraTask, 1);
295 } catch(TimedOutException& e) {
296 std::cout << "\t\t\t" << "add timed out unexpectedly" << std::endl;
297 throw TException("Unexpected timeout adding task");
298
299 } catch(TooManyPendingTasksException& e) {
300 std::cout << "\t\t\t" << "add encountered too many pending exepctions" << std::endl;
301 throw TException("Unexpected timeout adding task");
302 }
303
304 // Wake up tasks that were pending before and wait for them to complete
305
306 {
307 Synchronized s(bmonitor);
308
309 bmonitor.notifyAll();
310 }
311
312 {
313 Synchronized s(monitor);
314
315 while(activeCounts[1] != 0) {
316 monitor.wait();
317 }
318 }
319
320 // Wake up the extra task and wait for it to complete
321
322 {
323 Synchronized s(bmonitor);
324
325 bmonitor.notifyAll();
326 }
327
328 {
329 Synchronized s(monitor);
330
331 while(activeCounts[2] != 0) {
332 monitor.wait();
333 }
334 }
335
336 if(!(success = (threadManager->totalTaskCount() == 0))) {
337 throw TException("Unexpected pending task count");
338 }
339
340 } catch(TException& e) {
341 }
342
343 std::cout << "\t\t\t" << (success ? "Success" : "Failure") << std::endl;
344 return success;
345 }
Marc Slemko740343d2006-07-20 00:31:02 +0000346};
Marc Slemko6f038a72006-08-03 18:58:09 +0000347
348const double ThreadManagerTests::ERROR = .20;
349
T Jake Lucianib5e62212009-01-31 22:36:20 +0000350}}}} // apache::thrift::concurrency
Marc Slemko740343d2006-07-20 00:31:02 +0000351
T Jake Lucianib5e62212009-01-31 22:36:20 +0000352using namespace apache::thrift::concurrency::test;
Marc Slemko740343d2006-07-20 00:31:02 +0000353