blob: e12201c0cf4443577a75c1c17b18bfb3faf99d86 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Marc Slemkoc7782972006-07-25 02:26:35 +000020#include <config.h>
Marc Slemko6f038a72006-08-03 18:58:09 +000021#include <concurrency/ThreadManager.h>
Roger Meier3faaedf2011-10-02 10:51:45 +000022#include <concurrency/PlatformThreadFactory.h>
Marc Slemko6f038a72006-08-03 18:58:09 +000023#include <concurrency/Monitor.h>
24#include <concurrency/Util.h>
Marc Slemko740343d2006-07-20 00:31:02 +000025
26#include <assert.h>
27#include <set>
28#include <iostream>
Marc Slemkoc7782972006-07-25 02:26:35 +000029#include <set>
30#include <stdint.h>
Marc Slemko740343d2006-07-20 00:31:02 +000031
T Jake Lucianib5e62212009-01-31 22:36:20 +000032namespace apache { namespace thrift { namespace concurrency { namespace test {
Marc Slemko740343d2006-07-20 00:31:02 +000033
T Jake Lucianib5e62212009-01-31 22:36:20 +000034using namespace apache::thrift::concurrency;
Marc Slemko740343d2006-07-20 00:31:02 +000035
Mark Sleef5f2be42006-09-05 21:05:31 +000036/**
Marc Slemko3a3b53b2007-05-22 23:59:54 +000037 * ThreadManagerTests class
Mark Sleef5f2be42006-09-05 21:05:31 +000038 *
Mark Sleef5f2be42006-09-05 21:05:31 +000039 * @version $Id:$
40 */
Marc Slemko740343d2006-07-20 00:31:02 +000041class ThreadManagerTests {
42
43public:
44
Marc Slemko6f038a72006-08-03 18:58:09 +000045 static const double ERROR;
46
Marc Slemko740343d2006-07-20 00:31:02 +000047 class Task: public Runnable {
48
49 public:
Marc Slemko3a3b53b2007-05-22 23:59:54 +000050
Mark Slee9b82d272007-05-23 05:16:07 +000051 Task(Monitor& monitor, size_t& count, int64_t timeout) :
Marc Slemko740343d2006-07-20 00:31:02 +000052 _monitor(monitor),
53 _count(count),
54 _timeout(timeout),
Marc Slemko740343d2006-07-20 00:31:02 +000055 _done(false) {}
56
57 void run() {
58
Marc Slemkoc7782972006-07-25 02:26:35 +000059 _startTime = Util::currentTime();
Marc Slemko740343d2006-07-20 00:31:02 +000060
Mark Sleef5f2be42006-09-05 21:05:31 +000061 {
62 Synchronized s(_sleep);
Marc Slemko740343d2006-07-20 00:31:02 +000063
Marc Slemko3a3b53b2007-05-22 23:59:54 +000064 try {
65 _sleep.wait(_timeout);
66 } catch(TimedOutException& e) {
67 ;
68 }catch(...) {
69 assert(0);
70 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000071 }
Marc Slemko740343d2006-07-20 00:31:02 +000072
Marc Slemkoc7782972006-07-25 02:26:35 +000073 _endTime = Util::currentTime();
74
Marc Slemko740343d2006-07-20 00:31:02 +000075 _done = true;
Marc Slemko3a3b53b2007-05-22 23:59:54 +000076
Mark Sleef5f2be42006-09-05 21:05:31 +000077 {
78 Synchronized s(_monitor);
Marc Slemko740343d2006-07-20 00:31:02 +000079
David Reiss96d23882007-07-26 21:10:32 +000080 // std::cout << "Thread " << _count << " completed " << std::endl;
Marc Slemko3a3b53b2007-05-22 23:59:54 +000081
David Reiss96d23882007-07-26 21:10:32 +000082 _count--;
Marc Slemko740343d2006-07-20 00:31:02 +000083
David Reiss96d23882007-07-26 21:10:32 +000084 if (_count == 0) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +000085
David Reiss96d23882007-07-26 21:10:32 +000086 _monitor.notify();
87 }
Marc Slemko740343d2006-07-20 00:31:02 +000088 }
89 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +000090
Marc Slemko740343d2006-07-20 00:31:02 +000091 Monitor& _monitor;
92 size_t& _count;
Mark Slee9b82d272007-05-23 05:16:07 +000093 int64_t _timeout;
94 int64_t _startTime;
95 int64_t _endTime;
Marc Slemko740343d2006-07-20 00:31:02 +000096 bool _done;
Marc Slemkoc7782972006-07-25 02:26:35 +000097 Monitor _sleep;
Marc Slemko740343d2006-07-20 00:31:02 +000098 };
99
Mark Sleef5f2be42006-09-05 21:05:31 +0000100 /**
101 * Dispatch count tasks, each of which blocks for timeout milliseconds then
102 * completes. Verify that all tasks completed and that thread manager cleans
103 * up properly on delete.
104 */
Mark Slee9b82d272007-05-23 05:16:07 +0000105 bool loadTest(size_t count=100, int64_t timeout=100LL, size_t workerCount=4) {
Marc Slemko740343d2006-07-20 00:31:02 +0000106
107 Monitor monitor;
108
109 size_t activeCount = count;
110
Marc Slemko6f038a72006-08-03 18:58:09 +0000111 shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount);
Marc Slemkoc7782972006-07-25 02:26:35 +0000112
Roger Meier3faaedf2011-10-02 10:51:45 +0000113 shared_ptr<PlatformThreadFactory> threadFactory = shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory());
Marc Slemkoc7782972006-07-25 02:26:35 +0000114
Roger Meier3faaedf2011-10-02 10:51:45 +0000115#ifndef USE_BOOST_THREAD
Marc Slemkoa6479032007-06-05 22:20:14 +0000116 threadFactory->setPriority(PosixThreadFactory::HIGHEST);
Roger Meier3faaedf2011-10-02 10:51:45 +0000117#endif
Marc Slemkoc7782972006-07-25 02:26:35 +0000118 threadManager->threadFactory(threadFactory);
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000119
120 threadManager->start();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000121
Marc Slemko6f038a72006-08-03 18:58:09 +0000122 std::set<shared_ptr<ThreadManagerTests::Task> > tasks;
Marc Slemko740343d2006-07-20 00:31:02 +0000123
Mark Sleef5f2be42006-09-05 21:05:31 +0000124 for (size_t ix = 0; ix < count; ix++) {
Marc Slemko740343d2006-07-20 00:31:02 +0000125
Marc Slemko6f038a72006-08-03 18:58:09 +0000126 tasks.insert(shared_ptr<ThreadManagerTests::Task>(new ThreadManagerTests::Task(monitor, activeCount, timeout)));
Marc Slemko740343d2006-07-20 00:31:02 +0000127 }
128
Mark Slee9b82d272007-05-23 05:16:07 +0000129 int64_t time00 = Util::currentTime();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000130
Mark Sleef5f2be42006-09-05 21:05:31 +0000131 for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
Marc Slemko740343d2006-07-20 00:31:02 +0000132
David Reiss96d23882007-07-26 21:10:32 +0000133 threadManager->add(*ix);
Marc Slemko740343d2006-07-20 00:31:02 +0000134 }
135
Mark Sleef5f2be42006-09-05 21:05:31 +0000136 {
137 Synchronized s(monitor);
138
Marc Slemko740343d2006-07-20 00:31:02 +0000139 while(activeCount > 0) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000140
David Reiss96d23882007-07-26 21:10:32 +0000141 monitor.wait();
Marc Slemko740343d2006-07-20 00:31:02 +0000142 }
143 }
144
Mark Slee9b82d272007-05-23 05:16:07 +0000145 int64_t time01 = Util::currentTime();
Marc Slemko740343d2006-07-20 00:31:02 +0000146
Mark Slee9b82d272007-05-23 05:16:07 +0000147 int64_t firstTime = 9223372036854775807LL;
148 int64_t lastTime = 0;
Marc Slemkoc7782972006-07-25 02:26:35 +0000149
150 double averageTime = 0;
Mark Slee9b82d272007-05-23 05:16:07 +0000151 int64_t minTime = 9223372036854775807LL;
152 int64_t maxTime = 0;
Marc Slemkoc7782972006-07-25 02:26:35 +0000153
Mark Sleef5f2be42006-09-05 21:05:31 +0000154 for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000155
Marc Slemko6f038a72006-08-03 18:58:09 +0000156 shared_ptr<ThreadManagerTests::Task> task = *ix;
Marc Slemkoc7782972006-07-25 02:26:35 +0000157
Mark Slee9b82d272007-05-23 05:16:07 +0000158 int64_t delta = task->_endTime - task->_startTime;
Marc Slemkoc7782972006-07-25 02:26:35 +0000159
160 assert(delta > 0);
161
Mark Sleef5f2be42006-09-05 21:05:31 +0000162 if (task->_startTime < firstTime) {
David Reiss96d23882007-07-26 21:10:32 +0000163 firstTime = task->_startTime;
Marc Slemkoc7782972006-07-25 02:26:35 +0000164 }
165
Mark Sleef5f2be42006-09-05 21:05:31 +0000166 if (task->_endTime > lastTime) {
David Reiss96d23882007-07-26 21:10:32 +0000167 lastTime = task->_endTime;
Marc Slemkoc7782972006-07-25 02:26:35 +0000168 }
169
Mark Sleef5f2be42006-09-05 21:05:31 +0000170 if (delta < minTime) {
David Reiss96d23882007-07-26 21:10:32 +0000171 minTime = delta;
Marc Slemkoc7782972006-07-25 02:26:35 +0000172 }
173
Mark Sleef5f2be42006-09-05 21:05:31 +0000174 if (delta > maxTime) {
David Reiss96d23882007-07-26 21:10:32 +0000175 maxTime = delta;
Marc Slemkoc7782972006-07-25 02:26:35 +0000176 }
177
178 averageTime+= delta;
Marc Slemko740343d2006-07-20 00:31:02 +0000179 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000180
Marc Slemkoc7782972006-07-25 02:26:35 +0000181 averageTime /= count;
182
183 std::cout << "\t\t\tfirst start: " << firstTime << "ms Last end: " << lastTime << "ms min: " << minTime << "ms max: " << maxTime << "ms average: " << averageTime << "ms" << std::endl;
Marc Slemko740343d2006-07-20 00:31:02 +0000184
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000185 double expectedTime = ((count + (workerCount - 1)) / workerCount) * timeout;
186
187 double error = ((time01 - time00) - expectedTime) / expectedTime;
188
Mark Sleef5f2be42006-09-05 21:05:31 +0000189 if (error < 0) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000190 error*= -1.0;
191 }
192
Marc Slemko6f038a72006-08-03 18:58:09 +0000193 bool success = error < ERROR;
Marc Slemkoc7782972006-07-25 02:26:35 +0000194
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000195 std::cout << "\t\t\t" << (success ? "Success" : "Failure") << "! expected time: " << expectedTime << "ms elapsed time: "<< time01 - time00 << "ms error%: " << error * 100.0 << std::endl;
Marc Slemko740343d2006-07-20 00:31:02 +0000196
Marc Slemkoc7782972006-07-25 02:26:35 +0000197 return success;
Marc Slemko740343d2006-07-20 00:31:02 +0000198 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000199
200 class BlockTask: public Runnable {
201
202 public:
203
204 BlockTask(Monitor& monitor, Monitor& bmonitor, size_t& count) :
205 _monitor(monitor),
206 _bmonitor(bmonitor),
207 _count(count) {}
208
209 void run() {
210 {
211 Synchronized s(_bmonitor);
212
213 _bmonitor.wait();
214
215 }
216
217 {
218 Synchronized s(_monitor);
219
220 _count--;
221
222 if (_count == 0) {
223
224 _monitor.notify();
225 }
226 }
227 }
228
229 Monitor& _monitor;
230 Monitor& _bmonitor;
231 size_t& _count;
232 };
233
234 /**
235 * Block test. Create pendingTaskCountMax tasks. Verify that we block adding the
236 * pendingTaskCountMax + 1th task. Verify that we unblock when a task completes */
237
Mark Slee9b82d272007-05-23 05:16:07 +0000238 bool blockTest(int64_t timeout=100LL, size_t workerCount=2) {
Roger Meier3b771a12010-11-17 22:11:26 +0000239 (void) timeout;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000240 bool success = false;
241
242 try {
243
244 Monitor bmonitor;
245 Monitor monitor;
246
247 size_t pendingTaskMaxCount = workerCount;
248
249 size_t activeCounts[] = {workerCount, pendingTaskMaxCount, 1};
250
251 shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount, pendingTaskMaxCount);
252
Roger Meier3faaedf2011-10-02 10:51:45 +0000253 shared_ptr<PlatformThreadFactory> threadFactory = shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory());
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000254
Roger Meier3faaedf2011-10-02 10:51:45 +0000255#ifndef USE_BOOST_THREAD
Marc Slemkoa6479032007-06-05 22:20:14 +0000256 threadFactory->setPriority(PosixThreadFactory::HIGHEST);
Roger Meier3faaedf2011-10-02 10:51:45 +0000257#endif
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000258 threadManager->threadFactory(threadFactory);
259
260 threadManager->start();
261
262 std::set<shared_ptr<ThreadManagerTests::BlockTask> > tasks;
263
264 for (size_t ix = 0; ix < workerCount; ix++) {
265
266 tasks.insert(shared_ptr<ThreadManagerTests::BlockTask>(new ThreadManagerTests::BlockTask(monitor, bmonitor,activeCounts[0])));
267 }
268
269 for (size_t ix = 0; ix < pendingTaskMaxCount; ix++) {
270
271 tasks.insert(shared_ptr<ThreadManagerTests::BlockTask>(new ThreadManagerTests::BlockTask(monitor, bmonitor,activeCounts[1])));
272 }
273
274 for (std::set<shared_ptr<ThreadManagerTests::BlockTask> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
David Reiss96d23882007-07-26 21:10:32 +0000275 threadManager->add(*ix);
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000276 }
277
278 if(!(success = (threadManager->totalTaskCount() == pendingTaskMaxCount + workerCount))) {
279 throw TException("Unexpected pending task count");
280 }
281
282 shared_ptr<ThreadManagerTests::BlockTask> extraTask(new ThreadManagerTests::BlockTask(monitor, bmonitor, activeCounts[2]));
283
284 try {
285 threadManager->add(extraTask, 1);
286 throw TException("Unexpected success adding task in excess of pending task count");
David Reiss9fcacc82009-06-04 00:32:54 +0000287 } catch(TooManyPendingTasksException& e) {
288 throw TException("Should have timed out adding task in excess of pending task count");
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000289 } catch(TimedOutException& e) {
David Reiss9fcacc82009-06-04 00:32:54 +0000290 // Expected result
291 }
292
293 try {
294 threadManager->add(extraTask, -1);
295 throw TException("Unexpected success adding task in excess of pending task count");
296 } catch(TimedOutException& e) {
297 throw TException("Unexpected timeout adding task in excess of pending task count");
298 } catch(TooManyPendingTasksException& e) {
299 // Expected result
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000300 }
301
302 std::cout << "\t\t\t" << "Pending tasks " << threadManager->pendingTaskCount() << std::endl;
303
304 {
305 Synchronized s(bmonitor);
306
307 bmonitor.notifyAll();
308 }
309
310 {
311 Synchronized s(monitor);
312
313 while(activeCounts[0] != 0) {
314 monitor.wait();
315 }
316 }
317
318 std::cout << "\t\t\t" << "Pending tasks " << threadManager->pendingTaskCount() << std::endl;
319
320 try {
321 threadManager->add(extraTask, 1);
322 } catch(TimedOutException& e) {
323 std::cout << "\t\t\t" << "add timed out unexpectedly" << std::endl;
324 throw TException("Unexpected timeout adding task");
325
326 } catch(TooManyPendingTasksException& e) {
327 std::cout << "\t\t\t" << "add encountered too many pending exepctions" << std::endl;
328 throw TException("Unexpected timeout adding task");
329 }
330
331 // Wake up tasks that were pending before and wait for them to complete
332
333 {
334 Synchronized s(bmonitor);
335
336 bmonitor.notifyAll();
337 }
338
339 {
340 Synchronized s(monitor);
341
342 while(activeCounts[1] != 0) {
343 monitor.wait();
344 }
345 }
346
347 // Wake up the extra task and wait for it to complete
348
349 {
350 Synchronized s(bmonitor);
351
352 bmonitor.notifyAll();
353 }
354
355 {
356 Synchronized s(monitor);
357
358 while(activeCounts[2] != 0) {
359 monitor.wait();
360 }
361 }
362
363 if(!(success = (threadManager->totalTaskCount() == 0))) {
364 throw TException("Unexpected pending task count");
365 }
366
367 } catch(TException& e) {
David Reiss9fcacc82009-06-04 00:32:54 +0000368 std::cout << "ERROR: " << e.what() << std::endl;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000369 }
370
371 std::cout << "\t\t\t" << (success ? "Success" : "Failure") << std::endl;
372 return success;
373 }
Marc Slemko740343d2006-07-20 00:31:02 +0000374};
Marc Slemko6f038a72006-08-03 18:58:09 +0000375
376const double ThreadManagerTests::ERROR = .20;
377
T Jake Lucianib5e62212009-01-31 22:36:20 +0000378}}}} // apache::thrift::concurrency
Marc Slemko740343d2006-07-20 00:31:02 +0000379
T Jake Lucianib5e62212009-01-31 22:36:20 +0000380using namespace apache::thrift::concurrency::test;
Marc Slemko740343d2006-07-20 00:31:02 +0000381