blob: b6b5c3e47e3ee7550db63aed4240b2a228c47cfe [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Marc Slemkoc7782972006-07-25 02:26:35 +000020#include <config.h>
Marc Slemko6f038a72006-08-03 18:58:09 +000021#include <concurrency/ThreadManager.h>
22#include <concurrency/PosixThreadFactory.h>
23#include <concurrency/Monitor.h>
24#include <concurrency/Util.h>
Marc Slemko740343d2006-07-20 00:31:02 +000025
26#include <assert.h>
27#include <set>
28#include <iostream>
Marc Slemkoc7782972006-07-25 02:26:35 +000029#include <set>
30#include <stdint.h>
Marc Slemko740343d2006-07-20 00:31:02 +000031
T Jake Lucianib5e62212009-01-31 22:36:20 +000032namespace apache { namespace thrift { namespace concurrency { namespace test {
Marc Slemko740343d2006-07-20 00:31:02 +000033
T Jake Lucianib5e62212009-01-31 22:36:20 +000034using namespace apache::thrift::concurrency;
Marc Slemko740343d2006-07-20 00:31:02 +000035
Mark Sleef5f2be42006-09-05 21:05:31 +000036/**
Marc Slemko3a3b53b2007-05-22 23:59:54 +000037 * ThreadManagerTests class
Mark Sleef5f2be42006-09-05 21:05:31 +000038 *
Mark Sleef5f2be42006-09-05 21:05:31 +000039 * @version $Id:$
40 */
Marc Slemko740343d2006-07-20 00:31:02 +000041class ThreadManagerTests {
42
43public:
44
Marc Slemko6f038a72006-08-03 18:58:09 +000045 static const double ERROR;
46
Marc Slemko740343d2006-07-20 00:31:02 +000047 class Task: public Runnable {
48
49 public:
Marc Slemko3a3b53b2007-05-22 23:59:54 +000050
Mark Slee9b82d272007-05-23 05:16:07 +000051 Task(Monitor& monitor, size_t& count, int64_t timeout) :
Marc Slemko740343d2006-07-20 00:31:02 +000052 _monitor(monitor),
53 _count(count),
54 _timeout(timeout),
Marc Slemko740343d2006-07-20 00:31:02 +000055 _done(false) {}
56
57 void run() {
58
Marc Slemkoc7782972006-07-25 02:26:35 +000059 _startTime = Util::currentTime();
Marc Slemko740343d2006-07-20 00:31:02 +000060
Mark Sleef5f2be42006-09-05 21:05:31 +000061 {
62 Synchronized s(_sleep);
Marc Slemko740343d2006-07-20 00:31:02 +000063
Marc Slemko3a3b53b2007-05-22 23:59:54 +000064 try {
65 _sleep.wait(_timeout);
66 } catch(TimedOutException& e) {
67 ;
68 }catch(...) {
69 assert(0);
70 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000071 }
Marc Slemko740343d2006-07-20 00:31:02 +000072
Marc Slemkoc7782972006-07-25 02:26:35 +000073 _endTime = Util::currentTime();
74
Marc Slemko740343d2006-07-20 00:31:02 +000075 _done = true;
Marc Slemko3a3b53b2007-05-22 23:59:54 +000076
Mark Sleef5f2be42006-09-05 21:05:31 +000077 {
78 Synchronized s(_monitor);
Marc Slemko740343d2006-07-20 00:31:02 +000079
David Reiss96d23882007-07-26 21:10:32 +000080 // std::cout << "Thread " << _count << " completed " << std::endl;
Marc Slemko3a3b53b2007-05-22 23:59:54 +000081
David Reiss96d23882007-07-26 21:10:32 +000082 _count--;
Marc Slemko740343d2006-07-20 00:31:02 +000083
David Reiss96d23882007-07-26 21:10:32 +000084 if (_count == 0) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +000085
David Reiss96d23882007-07-26 21:10:32 +000086 _monitor.notify();
87 }
Marc Slemko740343d2006-07-20 00:31:02 +000088 }
89 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +000090
Marc Slemko740343d2006-07-20 00:31:02 +000091 Monitor& _monitor;
92 size_t& _count;
Mark Slee9b82d272007-05-23 05:16:07 +000093 int64_t _timeout;
94 int64_t _startTime;
95 int64_t _endTime;
Marc Slemko740343d2006-07-20 00:31:02 +000096 bool _done;
Marc Slemkoc7782972006-07-25 02:26:35 +000097 Monitor _sleep;
Marc Slemko740343d2006-07-20 00:31:02 +000098 };
99
Mark Sleef5f2be42006-09-05 21:05:31 +0000100 /**
101 * Dispatch count tasks, each of which blocks for timeout milliseconds then
102 * completes. Verify that all tasks completed and that thread manager cleans
103 * up properly on delete.
104 */
Mark Slee9b82d272007-05-23 05:16:07 +0000105 bool loadTest(size_t count=100, int64_t timeout=100LL, size_t workerCount=4) {
Marc Slemko740343d2006-07-20 00:31:02 +0000106
107 Monitor monitor;
108
109 size_t activeCount = count;
110
Marc Slemko6f038a72006-08-03 18:58:09 +0000111 shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount);
Marc Slemkoc7782972006-07-25 02:26:35 +0000112
Marc Slemko6f038a72006-08-03 18:58:09 +0000113 shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
Marc Slemkoc7782972006-07-25 02:26:35 +0000114
Marc Slemkoa6479032007-06-05 22:20:14 +0000115 threadFactory->setPriority(PosixThreadFactory::HIGHEST);
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000116
Marc Slemkoc7782972006-07-25 02:26:35 +0000117 threadManager->threadFactory(threadFactory);
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000118
119 threadManager->start();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000120
Marc Slemko6f038a72006-08-03 18:58:09 +0000121 std::set<shared_ptr<ThreadManagerTests::Task> > tasks;
Marc Slemko740343d2006-07-20 00:31:02 +0000122
Mark Sleef5f2be42006-09-05 21:05:31 +0000123 for (size_t ix = 0; ix < count; ix++) {
Marc Slemko740343d2006-07-20 00:31:02 +0000124
Marc Slemko6f038a72006-08-03 18:58:09 +0000125 tasks.insert(shared_ptr<ThreadManagerTests::Task>(new ThreadManagerTests::Task(monitor, activeCount, timeout)));
Marc Slemko740343d2006-07-20 00:31:02 +0000126 }
127
Mark Slee9b82d272007-05-23 05:16:07 +0000128 int64_t time00 = Util::currentTime();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000129
Mark Sleef5f2be42006-09-05 21:05:31 +0000130 for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
Marc Slemko740343d2006-07-20 00:31:02 +0000131
David Reiss96d23882007-07-26 21:10:32 +0000132 threadManager->add(*ix);
Marc Slemko740343d2006-07-20 00:31:02 +0000133 }
134
Mark Sleef5f2be42006-09-05 21:05:31 +0000135 {
136 Synchronized s(monitor);
137
Marc Slemko740343d2006-07-20 00:31:02 +0000138 while(activeCount > 0) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000139
David Reiss96d23882007-07-26 21:10:32 +0000140 monitor.wait();
Marc Slemko740343d2006-07-20 00:31:02 +0000141 }
142 }
143
Mark Slee9b82d272007-05-23 05:16:07 +0000144 int64_t time01 = Util::currentTime();
Marc Slemko740343d2006-07-20 00:31:02 +0000145
Mark Slee9b82d272007-05-23 05:16:07 +0000146 int64_t firstTime = 9223372036854775807LL;
147 int64_t lastTime = 0;
Marc Slemkoc7782972006-07-25 02:26:35 +0000148
149 double averageTime = 0;
Mark Slee9b82d272007-05-23 05:16:07 +0000150 int64_t minTime = 9223372036854775807LL;
151 int64_t maxTime = 0;
Marc Slemkoc7782972006-07-25 02:26:35 +0000152
Mark Sleef5f2be42006-09-05 21:05:31 +0000153 for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000154
Marc Slemko6f038a72006-08-03 18:58:09 +0000155 shared_ptr<ThreadManagerTests::Task> task = *ix;
Marc Slemkoc7782972006-07-25 02:26:35 +0000156
Mark Slee9b82d272007-05-23 05:16:07 +0000157 int64_t delta = task->_endTime - task->_startTime;
Marc Slemkoc7782972006-07-25 02:26:35 +0000158
159 assert(delta > 0);
160
Mark Sleef5f2be42006-09-05 21:05:31 +0000161 if (task->_startTime < firstTime) {
David Reiss96d23882007-07-26 21:10:32 +0000162 firstTime = task->_startTime;
Marc Slemkoc7782972006-07-25 02:26:35 +0000163 }
164
Mark Sleef5f2be42006-09-05 21:05:31 +0000165 if (task->_endTime > lastTime) {
David Reiss96d23882007-07-26 21:10:32 +0000166 lastTime = task->_endTime;
Marc Slemkoc7782972006-07-25 02:26:35 +0000167 }
168
Mark Sleef5f2be42006-09-05 21:05:31 +0000169 if (delta < minTime) {
David Reiss96d23882007-07-26 21:10:32 +0000170 minTime = delta;
Marc Slemkoc7782972006-07-25 02:26:35 +0000171 }
172
Mark Sleef5f2be42006-09-05 21:05:31 +0000173 if (delta > maxTime) {
David Reiss96d23882007-07-26 21:10:32 +0000174 maxTime = delta;
Marc Slemkoc7782972006-07-25 02:26:35 +0000175 }
176
177 averageTime+= delta;
Marc Slemko740343d2006-07-20 00:31:02 +0000178 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000179
Marc Slemkoc7782972006-07-25 02:26:35 +0000180 averageTime /= count;
181
182 std::cout << "\t\t\tfirst start: " << firstTime << "ms Last end: " << lastTime << "ms min: " << minTime << "ms max: " << maxTime << "ms average: " << averageTime << "ms" << std::endl;
Marc Slemko740343d2006-07-20 00:31:02 +0000183
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000184 double expectedTime = ((count + (workerCount - 1)) / workerCount) * timeout;
185
186 double error = ((time01 - time00) - expectedTime) / expectedTime;
187
Mark Sleef5f2be42006-09-05 21:05:31 +0000188 if (error < 0) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000189 error*= -1.0;
190 }
191
Marc Slemko6f038a72006-08-03 18:58:09 +0000192 bool success = error < ERROR;
Marc Slemkoc7782972006-07-25 02:26:35 +0000193
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000194 std::cout << "\t\t\t" << (success ? "Success" : "Failure") << "! expected time: " << expectedTime << "ms elapsed time: "<< time01 - time00 << "ms error%: " << error * 100.0 << std::endl;
Marc Slemko740343d2006-07-20 00:31:02 +0000195
Marc Slemkoc7782972006-07-25 02:26:35 +0000196 return success;
Marc Slemko740343d2006-07-20 00:31:02 +0000197 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000198
199 class BlockTask: public Runnable {
200
201 public:
202
203 BlockTask(Monitor& monitor, Monitor& bmonitor, size_t& count) :
204 _monitor(monitor),
205 _bmonitor(bmonitor),
206 _count(count) {}
207
208 void run() {
209 {
210 Synchronized s(_bmonitor);
211
212 _bmonitor.wait();
213
214 }
215
216 {
217 Synchronized s(_monitor);
218
219 _count--;
220
221 if (_count == 0) {
222
223 _monitor.notify();
224 }
225 }
226 }
227
228 Monitor& _monitor;
229 Monitor& _bmonitor;
230 size_t& _count;
231 };
232
233 /**
234 * Block test. Create pendingTaskCountMax tasks. Verify that we block adding the
235 * pendingTaskCountMax + 1th task. Verify that we unblock when a task completes */
236
Mark Slee9b82d272007-05-23 05:16:07 +0000237 bool blockTest(int64_t timeout=100LL, size_t workerCount=2) {
Roger Meier3b771a12010-11-17 22:11:26 +0000238 (void) timeout;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000239 bool success = false;
240
241 try {
242
243 Monitor bmonitor;
244 Monitor monitor;
245
246 size_t pendingTaskMaxCount = workerCount;
247
248 size_t activeCounts[] = {workerCount, pendingTaskMaxCount, 1};
249
250 shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount, pendingTaskMaxCount);
251
252 shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
253
Marc Slemkoa6479032007-06-05 22:20:14 +0000254 threadFactory->setPriority(PosixThreadFactory::HIGHEST);
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000255
256 threadManager->threadFactory(threadFactory);
257
258 threadManager->start();
259
260 std::set<shared_ptr<ThreadManagerTests::BlockTask> > tasks;
261
262 for (size_t ix = 0; ix < workerCount; ix++) {
263
264 tasks.insert(shared_ptr<ThreadManagerTests::BlockTask>(new ThreadManagerTests::BlockTask(monitor, bmonitor,activeCounts[0])));
265 }
266
267 for (size_t ix = 0; ix < pendingTaskMaxCount; ix++) {
268
269 tasks.insert(shared_ptr<ThreadManagerTests::BlockTask>(new ThreadManagerTests::BlockTask(monitor, bmonitor,activeCounts[1])));
270 }
271
272 for (std::set<shared_ptr<ThreadManagerTests::BlockTask> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
David Reiss96d23882007-07-26 21:10:32 +0000273 threadManager->add(*ix);
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000274 }
275
276 if(!(success = (threadManager->totalTaskCount() == pendingTaskMaxCount + workerCount))) {
277 throw TException("Unexpected pending task count");
278 }
279
280 shared_ptr<ThreadManagerTests::BlockTask> extraTask(new ThreadManagerTests::BlockTask(monitor, bmonitor, activeCounts[2]));
281
282 try {
283 threadManager->add(extraTask, 1);
284 throw TException("Unexpected success adding task in excess of pending task count");
David Reiss9fcacc82009-06-04 00:32:54 +0000285 } catch(TooManyPendingTasksException& e) {
286 throw TException("Should have timed out adding task in excess of pending task count");
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000287 } catch(TimedOutException& e) {
David Reiss9fcacc82009-06-04 00:32:54 +0000288 // Expected result
289 }
290
291 try {
292 threadManager->add(extraTask, -1);
293 throw TException("Unexpected success adding task in excess of pending task count");
294 } catch(TimedOutException& e) {
295 throw TException("Unexpected timeout adding task in excess of pending task count");
296 } catch(TooManyPendingTasksException& e) {
297 // Expected result
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000298 }
299
300 std::cout << "\t\t\t" << "Pending tasks " << threadManager->pendingTaskCount() << std::endl;
301
302 {
303 Synchronized s(bmonitor);
304
305 bmonitor.notifyAll();
306 }
307
308 {
309 Synchronized s(monitor);
310
311 while(activeCounts[0] != 0) {
312 monitor.wait();
313 }
314 }
315
316 std::cout << "\t\t\t" << "Pending tasks " << threadManager->pendingTaskCount() << std::endl;
317
318 try {
319 threadManager->add(extraTask, 1);
320 } catch(TimedOutException& e) {
321 std::cout << "\t\t\t" << "add timed out unexpectedly" << std::endl;
322 throw TException("Unexpected timeout adding task");
323
324 } catch(TooManyPendingTasksException& e) {
325 std::cout << "\t\t\t" << "add encountered too many pending exepctions" << std::endl;
326 throw TException("Unexpected timeout adding task");
327 }
328
329 // Wake up tasks that were pending before and wait for them to complete
330
331 {
332 Synchronized s(bmonitor);
333
334 bmonitor.notifyAll();
335 }
336
337 {
338 Synchronized s(monitor);
339
340 while(activeCounts[1] != 0) {
341 monitor.wait();
342 }
343 }
344
345 // Wake up the extra task and wait for it to complete
346
347 {
348 Synchronized s(bmonitor);
349
350 bmonitor.notifyAll();
351 }
352
353 {
354 Synchronized s(monitor);
355
356 while(activeCounts[2] != 0) {
357 monitor.wait();
358 }
359 }
360
361 if(!(success = (threadManager->totalTaskCount() == 0))) {
362 throw TException("Unexpected pending task count");
363 }
364
365 } catch(TException& e) {
David Reiss9fcacc82009-06-04 00:32:54 +0000366 std::cout << "ERROR: " << e.what() << std::endl;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000367 }
368
369 std::cout << "\t\t\t" << (success ? "Success" : "Failure") << std::endl;
370 return success;
371 }
Marc Slemko740343d2006-07-20 00:31:02 +0000372};
Marc Slemko6f038a72006-08-03 18:58:09 +0000373
374const double ThreadManagerTests::ERROR = .20;
375
T Jake Lucianib5e62212009-01-31 22:36:20 +0000376}}}} // apache::thrift::concurrency
Marc Slemko740343d2006-07-20 00:31:02 +0000377
T Jake Lucianib5e62212009-01-31 22:36:20 +0000378using namespace apache::thrift::concurrency::test;
Marc Slemko740343d2006-07-20 00:31:02 +0000379