blob: 904948c4cbe77a74690c6adb03214f5408a38929 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Roger Meierba406d32013-07-15 22:41:34 +020020#include <thrift/thrift-config.h>
Roger Meier49ff8b12012-04-13 09:12:31 +000021#include <thrift/concurrency/ThreadManager.h>
22#include <thrift/concurrency/PlatformThreadFactory.h>
23#include <thrift/concurrency/Monitor.h>
24#include <thrift/concurrency/Util.h>
Marc Slemko740343d2006-07-20 00:31:02 +000025
26#include <assert.h>
27#include <set>
28#include <iostream>
Marc Slemkoc7782972006-07-25 02:26:35 +000029#include <set>
30#include <stdint.h>
Marc Slemko740343d2006-07-20 00:31:02 +000031
Konrad Grochowski16a23a62014-11-13 15:33:38 +010032namespace apache {
33namespace thrift {
34namespace concurrency {
35namespace test {
Marc Slemko740343d2006-07-20 00:31:02 +000036
T Jake Lucianib5e62212009-01-31 22:36:20 +000037using namespace apache::thrift::concurrency;
Marc Slemko740343d2006-07-20 00:31:02 +000038
Marc Slemko740343d2006-07-20 00:31:02 +000039class ThreadManagerTests {
40
Konrad Grochowski293a40e2014-09-04 17:28:17 +040041 static const double TEST_TOLERANCE;
42
Marc Slemko740343d2006-07-20 00:31:02 +000043public:
Konrad Grochowski16a23a62014-11-13 15:33:38 +010044 class Task : public Runnable {
Marc Slemko740343d2006-07-20 00:31:02 +000045
46 public:
Konrad Grochowski16a23a62014-11-13 15:33:38 +010047 Task(Monitor& monitor, size_t& count, int64_t timeout)
48 : _monitor(monitor), _count(count), _timeout(timeout), _done(false) {}
Marc Slemko740343d2006-07-20 00:31:02 +000049
50 void run() {
51
Marc Slemkoc7782972006-07-25 02:26:35 +000052 _startTime = Util::currentTime();
Marc Slemko740343d2006-07-20 00:31:02 +000053
Mark Sleef5f2be42006-09-05 21:05:31 +000054 {
55 Synchronized s(_sleep);
Marc Slemko740343d2006-07-20 00:31:02 +000056
Marc Slemko3a3b53b2007-05-22 23:59:54 +000057 try {
58 _sleep.wait(_timeout);
Konrad Grochowski16a23a62014-11-13 15:33:38 +010059 } catch (TimedOutException& e) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +000060 ;
Konrad Grochowski16a23a62014-11-13 15:33:38 +010061 } catch (...) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +000062 assert(0);
63 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000064 }
Marc Slemko740343d2006-07-20 00:31:02 +000065
Marc Slemkoc7782972006-07-25 02:26:35 +000066 _endTime = Util::currentTime();
67
Marc Slemko740343d2006-07-20 00:31:02 +000068 _done = true;
Marc Slemko3a3b53b2007-05-22 23:59:54 +000069
Mark Sleef5f2be42006-09-05 21:05:31 +000070 {
71 Synchronized s(_monitor);
Marc Slemko740343d2006-07-20 00:31:02 +000072
David Reiss96d23882007-07-26 21:10:32 +000073 // std::cout << "Thread " << _count << " completed " << std::endl;
Marc Slemko3a3b53b2007-05-22 23:59:54 +000074
David Reiss96d23882007-07-26 21:10:32 +000075 _count--;
Marc Slemko740343d2006-07-20 00:31:02 +000076
David Reiss96d23882007-07-26 21:10:32 +000077 if (_count == 0) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +000078
David Reiss96d23882007-07-26 21:10:32 +000079 _monitor.notify();
80 }
Marc Slemko740343d2006-07-20 00:31:02 +000081 }
82 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +000083
Marc Slemko740343d2006-07-20 00:31:02 +000084 Monitor& _monitor;
85 size_t& _count;
Mark Slee9b82d272007-05-23 05:16:07 +000086 int64_t _timeout;
87 int64_t _startTime;
88 int64_t _endTime;
Marc Slemko740343d2006-07-20 00:31:02 +000089 bool _done;
Marc Slemkoc7782972006-07-25 02:26:35 +000090 Monitor _sleep;
Marc Slemko740343d2006-07-20 00:31:02 +000091 };
92
Mark Sleef5f2be42006-09-05 21:05:31 +000093 /**
94 * Dispatch count tasks, each of which blocks for timeout milliseconds then
95 * completes. Verify that all tasks completed and that thread manager cleans
96 * up properly on delete.
97 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +010098 bool loadTest(size_t count = 100, int64_t timeout = 100LL, size_t workerCount = 4) {
Marc Slemko740343d2006-07-20 00:31:02 +000099
100 Monitor monitor;
101
102 size_t activeCount = count;
103
Marc Slemko6f038a72006-08-03 18:58:09 +0000104 shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount);
Marc Slemkoc7782972006-07-25 02:26:35 +0000105
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100106 shared_ptr<PlatformThreadFactory> threadFactory
107 = shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory());
Marc Slemkoc7782972006-07-25 02:26:35 +0000108
Nobuaki Sukegawa28256642014-12-16 03:24:37 +0900109#if !USE_BOOST_THREAD && !USE_STD_THREAD
Marc Slemkoa6479032007-06-05 22:20:14 +0000110 threadFactory->setPriority(PosixThreadFactory::HIGHEST);
Roger Meier3faaedf2011-10-02 10:51:45 +0000111#endif
Marc Slemkoc7782972006-07-25 02:26:35 +0000112 threadManager->threadFactory(threadFactory);
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000113
114 threadManager->start();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000115
Marc Slemko6f038a72006-08-03 18:58:09 +0000116 std::set<shared_ptr<ThreadManagerTests::Task> > tasks;
Marc Slemko740343d2006-07-20 00:31:02 +0000117
Mark Sleef5f2be42006-09-05 21:05:31 +0000118 for (size_t ix = 0; ix < count; ix++) {
Marc Slemko740343d2006-07-20 00:31:02 +0000119
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100120 tasks.insert(shared_ptr<ThreadManagerTests::Task>(
121 new ThreadManagerTests::Task(monitor, activeCount, timeout)));
Marc Slemko740343d2006-07-20 00:31:02 +0000122 }
123
Mark Slee9b82d272007-05-23 05:16:07 +0000124 int64_t time00 = Util::currentTime();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000125
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100126 for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin();
127 ix != tasks.end();
128 ix++) {
Marc Slemko740343d2006-07-20 00:31:02 +0000129
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100130 threadManager->add(*ix);
Marc Slemko740343d2006-07-20 00:31:02 +0000131 }
132
Mark Sleef5f2be42006-09-05 21:05:31 +0000133 {
134 Synchronized s(monitor);
135
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100136 while (activeCount > 0) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000137
David Reiss96d23882007-07-26 21:10:32 +0000138 monitor.wait();
Marc Slemko740343d2006-07-20 00:31:02 +0000139 }
140 }
141
Mark Slee9b82d272007-05-23 05:16:07 +0000142 int64_t time01 = Util::currentTime();
Marc Slemko740343d2006-07-20 00:31:02 +0000143
Mark Slee9b82d272007-05-23 05:16:07 +0000144 int64_t firstTime = 9223372036854775807LL;
145 int64_t lastTime = 0;
Marc Slemkoc7782972006-07-25 02:26:35 +0000146
147 double averageTime = 0;
Mark Slee9b82d272007-05-23 05:16:07 +0000148 int64_t minTime = 9223372036854775807LL;
149 int64_t maxTime = 0;
Marc Slemkoc7782972006-07-25 02:26:35 +0000150
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100151 for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin();
152 ix != tasks.end();
153 ix++) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000154
Marc Slemko6f038a72006-08-03 18:58:09 +0000155 shared_ptr<ThreadManagerTests::Task> task = *ix;
Marc Slemkoc7782972006-07-25 02:26:35 +0000156
Mark Slee9b82d272007-05-23 05:16:07 +0000157 int64_t delta = task->_endTime - task->_startTime;
Marc Slemkoc7782972006-07-25 02:26:35 +0000158
159 assert(delta > 0);
160
Mark Sleef5f2be42006-09-05 21:05:31 +0000161 if (task->_startTime < firstTime) {
David Reiss96d23882007-07-26 21:10:32 +0000162 firstTime = task->_startTime;
Marc Slemkoc7782972006-07-25 02:26:35 +0000163 }
164
Mark Sleef5f2be42006-09-05 21:05:31 +0000165 if (task->_endTime > lastTime) {
David Reiss96d23882007-07-26 21:10:32 +0000166 lastTime = task->_endTime;
Marc Slemkoc7782972006-07-25 02:26:35 +0000167 }
168
Mark Sleef5f2be42006-09-05 21:05:31 +0000169 if (delta < minTime) {
David Reiss96d23882007-07-26 21:10:32 +0000170 minTime = delta;
Marc Slemkoc7782972006-07-25 02:26:35 +0000171 }
172
Mark Sleef5f2be42006-09-05 21:05:31 +0000173 if (delta > maxTime) {
David Reiss96d23882007-07-26 21:10:32 +0000174 maxTime = delta;
Marc Slemkoc7782972006-07-25 02:26:35 +0000175 }
176
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100177 averageTime += delta;
Marc Slemko740343d2006-07-20 00:31:02 +0000178 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000179
Marc Slemkoc7782972006-07-25 02:26:35 +0000180 averageTime /= count;
181
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100182 std::cout << "\t\t\tfirst start: " << firstTime << "ms Last end: " << lastTime
183 << "ms min: " << minTime << "ms max: " << maxTime << "ms average: " << averageTime
184 << "ms" << std::endl;
Marc Slemko740343d2006-07-20 00:31:02 +0000185
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000186 double expectedTime = ((count + (workerCount - 1)) / workerCount) * timeout;
187
188 double error = ((time01 - time00) - expectedTime) / expectedTime;
189
Mark Sleef5f2be42006-09-05 21:05:31 +0000190 if (error < 0) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100191 error *= -1.0;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000192 }
193
Konrad Grochowski293a40e2014-09-04 17:28:17 +0400194 bool success = error < TEST_TOLERANCE;
Marc Slemkoc7782972006-07-25 02:26:35 +0000195
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100196 std::cout << "\t\t\t" << (success ? "Success" : "Failure")
197 << "! expected time: " << expectedTime << "ms elapsed time: " << time01 - time00
198 << "ms error%: " << error * 100.0 << std::endl;
Marc Slemko740343d2006-07-20 00:31:02 +0000199
Marc Slemkoc7782972006-07-25 02:26:35 +0000200 return success;
Marc Slemko740343d2006-07-20 00:31:02 +0000201 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000202
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100203 class BlockTask : public Runnable {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000204
205 public:
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100206 BlockTask(Monitor& monitor, Monitor& bmonitor, size_t& count)
207 : _monitor(monitor), _bmonitor(bmonitor), _count(count) {}
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000208
209 void run() {
210 {
211 Synchronized s(_bmonitor);
212
213 _bmonitor.wait();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000214 }
215
216 {
217 Synchronized s(_monitor);
218
219 _count--;
220
221 if (_count == 0) {
222
223 _monitor.notify();
224 }
225 }
226 }
227
228 Monitor& _monitor;
229 Monitor& _bmonitor;
230 size_t& _count;
231 };
232
233 /**
234 * Block test. Create pendingTaskCountMax tasks. Verify that we block adding the
235 * pendingTaskCountMax + 1th task. Verify that we unblock when a task completes */
236
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100237 bool blockTest(int64_t timeout = 100LL, size_t workerCount = 2) {
238 (void)timeout;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000239 bool success = false;
240
241 try {
242
243 Monitor bmonitor;
244 Monitor monitor;
245
246 size_t pendingTaskMaxCount = workerCount;
247
248 size_t activeCounts[] = {workerCount, pendingTaskMaxCount, 1};
249
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100250 shared_ptr<ThreadManager> threadManager
251 = ThreadManager::newSimpleThreadManager(workerCount, pendingTaskMaxCount);
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000252
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100253 shared_ptr<PlatformThreadFactory> threadFactory
254 = shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory());
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000255
Nobuaki Sukegawa28256642014-12-16 03:24:37 +0900256#if !USE_BOOST_THREAD && !USE_STD_THREAD
Marc Slemkoa6479032007-06-05 22:20:14 +0000257 threadFactory->setPriority(PosixThreadFactory::HIGHEST);
Roger Meier3faaedf2011-10-02 10:51:45 +0000258#endif
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000259 threadManager->threadFactory(threadFactory);
260
261 threadManager->start();
262
263 std::set<shared_ptr<ThreadManagerTests::BlockTask> > tasks;
264
265 for (size_t ix = 0; ix < workerCount; ix++) {
266
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100267 tasks.insert(shared_ptr<ThreadManagerTests::BlockTask>(
268 new ThreadManagerTests::BlockTask(monitor, bmonitor, activeCounts[0])));
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000269 }
270
271 for (size_t ix = 0; ix < pendingTaskMaxCount; ix++) {
272
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100273 tasks.insert(shared_ptr<ThreadManagerTests::BlockTask>(
274 new ThreadManagerTests::BlockTask(monitor, bmonitor, activeCounts[1])));
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000275 }
276
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100277 for (std::set<shared_ptr<ThreadManagerTests::BlockTask> >::iterator ix = tasks.begin();
278 ix != tasks.end();
279 ix++) {
David Reiss96d23882007-07-26 21:10:32 +0000280 threadManager->add(*ix);
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000281 }
282
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100283 if (!(success = (threadManager->totalTaskCount() == pendingTaskMaxCount + workerCount))) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000284 throw TException("Unexpected pending task count");
285 }
286
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100287 shared_ptr<ThreadManagerTests::BlockTask> extraTask(
288 new ThreadManagerTests::BlockTask(monitor, bmonitor, activeCounts[2]));
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000289
290 try {
291 threadManager->add(extraTask, 1);
292 throw TException("Unexpected success adding task in excess of pending task count");
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100293 } catch (TooManyPendingTasksException& e) {
David Reiss9fcacc82009-06-04 00:32:54 +0000294 throw TException("Should have timed out adding task in excess of pending task count");
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100295 } catch (TimedOutException& e) {
David Reiss9fcacc82009-06-04 00:32:54 +0000296 // Expected result
297 }
298
299 try {
300 threadManager->add(extraTask, -1);
301 throw TException("Unexpected success adding task in excess of pending task count");
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100302 } catch (TimedOutException& e) {
David Reiss9fcacc82009-06-04 00:32:54 +0000303 throw TException("Unexpected timeout adding task in excess of pending task count");
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100304 } catch (TooManyPendingTasksException& e) {
David Reiss9fcacc82009-06-04 00:32:54 +0000305 // Expected result
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000306 }
307
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100308 std::cout << "\t\t\t"
309 << "Pending tasks " << threadManager->pendingTaskCount() << std::endl;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000310
311 {
312 Synchronized s(bmonitor);
313
314 bmonitor.notifyAll();
315 }
316
317 {
318 Synchronized s(monitor);
319
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100320 while (activeCounts[0] != 0) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000321 monitor.wait();
322 }
323 }
324
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100325 std::cout << "\t\t\t"
326 << "Pending tasks " << threadManager->pendingTaskCount() << std::endl;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000327
328 try {
329 threadManager->add(extraTask, 1);
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100330 } catch (TimedOutException& e) {
331 std::cout << "\t\t\t"
332 << "add timed out unexpectedly" << std::endl;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000333 throw TException("Unexpected timeout adding task");
334
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100335 } catch (TooManyPendingTasksException& e) {
336 std::cout << "\t\t\t"
337 << "add encountered too many pending exepctions" << std::endl;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000338 throw TException("Unexpected timeout adding task");
339 }
340
341 // Wake up tasks that were pending before and wait for them to complete
342
343 {
344 Synchronized s(bmonitor);
345
346 bmonitor.notifyAll();
347 }
348
349 {
350 Synchronized s(monitor);
351
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100352 while (activeCounts[1] != 0) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000353 monitor.wait();
354 }
355 }
356
357 // Wake up the extra task and wait for it to complete
358
359 {
360 Synchronized s(bmonitor);
361
362 bmonitor.notifyAll();
363 }
364
365 {
366 Synchronized s(monitor);
367
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100368 while (activeCounts[2] != 0) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000369 monitor.wait();
370 }
371 }
372
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100373 if (!(success = (threadManager->totalTaskCount() == 0))) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000374 throw TException("Unexpected pending task count");
375 }
376
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100377 } catch (TException& e) {
David Reiss9fcacc82009-06-04 00:32:54 +0000378 std::cout << "ERROR: " << e.what() << std::endl;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000379 }
380
381 std::cout << "\t\t\t" << (success ? "Success" : "Failure") << std::endl;
382 return success;
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100383 }
Marc Slemko740343d2006-07-20 00:31:02 +0000384};
Marc Slemko6f038a72006-08-03 18:58:09 +0000385
Konrad Grochowski293a40e2014-09-04 17:28:17 +0400386const double ThreadManagerTests::TEST_TOLERANCE = .20;
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100387}
388}
389}
390} // apache::thrift::concurrency
Marc Slemko740343d2006-07-20 00:31:02 +0000391
T Jake Lucianib5e62212009-01-31 22:36:20 +0000392using namespace apache::thrift::concurrency::test;