blob: b5925ac2f19f8f1e7166e49515ddf013df570c43 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Roger Meierba406d32013-07-15 22:41:34 +020020#include <thrift/thrift-config.h>
Roger Meier49ff8b12012-04-13 09:12:31 +000021#include <thrift/concurrency/ThreadManager.h>
22#include <thrift/concurrency/PlatformThreadFactory.h>
23#include <thrift/concurrency/Monitor.h>
24#include <thrift/concurrency/Util.h>
Marc Slemko740343d2006-07-20 00:31:02 +000025
26#include <assert.h>
James E. King, IIIdf899132016-11-12 15:16:30 -050027#include <deque>
Marc Slemko740343d2006-07-20 00:31:02 +000028#include <set>
29#include <iostream>
Marc Slemkoc7782972006-07-25 02:26:35 +000030#include <stdint.h>
Marc Slemko740343d2006-07-20 00:31:02 +000031
Konrad Grochowski16a23a62014-11-13 15:33:38 +010032namespace apache {
33namespace thrift {
34namespace concurrency {
35namespace test {
Marc Slemko740343d2006-07-20 00:31:02 +000036
T Jake Lucianib5e62212009-01-31 22:36:20 +000037using namespace apache::thrift::concurrency;
Marc Slemko740343d2006-07-20 00:31:02 +000038
James E. King, IIIdf899132016-11-12 15:16:30 -050039static std::deque<boost::shared_ptr<Runnable> > m_expired;
40static void expiredNotifier(boost::shared_ptr<Runnable> runnable)
41{
42 m_expired.push_back(runnable);
43}
Marc Slemko740343d2006-07-20 00:31:02 +000044
James E. King, IIIdf899132016-11-12 15:16:30 -050045static void sleep_(int64_t millisec) {
46 Monitor _sleep;
47 Synchronized s(_sleep);
48
49 try {
50 _sleep.wait(millisec);
51 } catch (TimedOutException&) {
52 ;
53 } catch (...) {
54 assert(0);
55 }
56}
57
58class ThreadManagerTests {
Konrad Grochowski293a40e2014-09-04 17:28:17 +040059
Marc Slemko740343d2006-07-20 00:31:02 +000060public:
Konrad Grochowski16a23a62014-11-13 15:33:38 +010061 class Task : public Runnable {
Marc Slemko740343d2006-07-20 00:31:02 +000062
63 public:
Konrad Grochowski16a23a62014-11-13 15:33:38 +010064 Task(Monitor& monitor, size_t& count, int64_t timeout)
James E. King, III36200902016-10-05 14:47:18 -040065 : _monitor(monitor), _count(count), _timeout(timeout), _startTime(0), _endTime(0), _done(false) {}
Marc Slemko740343d2006-07-20 00:31:02 +000066
67 void run() {
68
Marc Slemkoc7782972006-07-25 02:26:35 +000069 _startTime = Util::currentTime();
Marc Slemko740343d2006-07-20 00:31:02 +000070
James E. King, IIIdf899132016-11-12 15:16:30 -050071 sleep_(_timeout);
Marc Slemko740343d2006-07-20 00:31:02 +000072
Marc Slemkoc7782972006-07-25 02:26:35 +000073 _endTime = Util::currentTime();
74
Marc Slemko740343d2006-07-20 00:31:02 +000075 _done = true;
Marc Slemko3a3b53b2007-05-22 23:59:54 +000076
Mark Sleef5f2be42006-09-05 21:05:31 +000077 {
78 Synchronized s(_monitor);
Marc Slemko740343d2006-07-20 00:31:02 +000079
David Reiss96d23882007-07-26 21:10:32 +000080 // std::cout << "Thread " << _count << " completed " << std::endl;
Marc Slemko3a3b53b2007-05-22 23:59:54 +000081
David Reiss96d23882007-07-26 21:10:32 +000082 _count--;
James E. King, IIIdf899132016-11-12 15:16:30 -050083 if (_count % 10000 == 0) {
David Reiss96d23882007-07-26 21:10:32 +000084 _monitor.notify();
85 }
Marc Slemko740343d2006-07-20 00:31:02 +000086 }
87 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +000088
Marc Slemko740343d2006-07-20 00:31:02 +000089 Monitor& _monitor;
90 size_t& _count;
Mark Slee9b82d272007-05-23 05:16:07 +000091 int64_t _timeout;
92 int64_t _startTime;
93 int64_t _endTime;
Marc Slemko740343d2006-07-20 00:31:02 +000094 bool _done;
Marc Slemkoc7782972006-07-25 02:26:35 +000095 Monitor _sleep;
Marc Slemko740343d2006-07-20 00:31:02 +000096 };
97
Mark Sleef5f2be42006-09-05 21:05:31 +000098 /**
99 * Dispatch count tasks, each of which blocks for timeout milliseconds then
100 * completes. Verify that all tasks completed and that thread manager cleans
101 * up properly on delete.
102 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100103 bool loadTest(size_t count = 100, int64_t timeout = 100LL, size_t workerCount = 4) {
Marc Slemko740343d2006-07-20 00:31:02 +0000104
105 Monitor monitor;
106
107 size_t activeCount = count;
108
Marc Slemko6f038a72006-08-03 18:58:09 +0000109 shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount);
Marc Slemkoc7782972006-07-25 02:26:35 +0000110
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100111 shared_ptr<PlatformThreadFactory> threadFactory
112 = shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory());
Marc Slemkoc7782972006-07-25 02:26:35 +0000113
Nobuaki Sukegawa28256642014-12-16 03:24:37 +0900114#if !USE_BOOST_THREAD && !USE_STD_THREAD
Marc Slemkoa6479032007-06-05 22:20:14 +0000115 threadFactory->setPriority(PosixThreadFactory::HIGHEST);
Roger Meier3faaedf2011-10-02 10:51:45 +0000116#endif
Marc Slemkoc7782972006-07-25 02:26:35 +0000117 threadManager->threadFactory(threadFactory);
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000118
119 threadManager->start();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000120
Marc Slemko6f038a72006-08-03 18:58:09 +0000121 std::set<shared_ptr<ThreadManagerTests::Task> > tasks;
Marc Slemko740343d2006-07-20 00:31:02 +0000122
Mark Sleef5f2be42006-09-05 21:05:31 +0000123 for (size_t ix = 0; ix < count; ix++) {
Marc Slemko740343d2006-07-20 00:31:02 +0000124
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100125 tasks.insert(shared_ptr<ThreadManagerTests::Task>(
126 new ThreadManagerTests::Task(monitor, activeCount, timeout)));
Marc Slemko740343d2006-07-20 00:31:02 +0000127 }
128
Mark Slee9b82d272007-05-23 05:16:07 +0000129 int64_t time00 = Util::currentTime();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000130
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100131 for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin();
132 ix != tasks.end();
133 ix++) {
Marc Slemko740343d2006-07-20 00:31:02 +0000134
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100135 threadManager->add(*ix);
Marc Slemko740343d2006-07-20 00:31:02 +0000136 }
137
James E. King, IIIdf899132016-11-12 15:16:30 -0500138 std::cout << "\t\t\t\tloaded " << count << " tasks to execute" << std::endl;
139
Mark Sleef5f2be42006-09-05 21:05:31 +0000140 {
141 Synchronized s(monitor);
142
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100143 while (activeCount > 0) {
James E. King, IIIdf899132016-11-12 15:16:30 -0500144 std::cout << "\t\t\t\tactiveCount = " << activeCount << std::endl;
David Reiss96d23882007-07-26 21:10:32 +0000145 monitor.wait();
Marc Slemko740343d2006-07-20 00:31:02 +0000146 }
147 }
148
Mark Slee9b82d272007-05-23 05:16:07 +0000149 int64_t time01 = Util::currentTime();
Marc Slemko740343d2006-07-20 00:31:02 +0000150
Mark Slee9b82d272007-05-23 05:16:07 +0000151 int64_t firstTime = 9223372036854775807LL;
152 int64_t lastTime = 0;
Marc Slemkoc7782972006-07-25 02:26:35 +0000153
154 double averageTime = 0;
Mark Slee9b82d272007-05-23 05:16:07 +0000155 int64_t minTime = 9223372036854775807LL;
156 int64_t maxTime = 0;
Marc Slemkoc7782972006-07-25 02:26:35 +0000157
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100158 for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin();
159 ix != tasks.end();
160 ix++) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000161
Marc Slemko6f038a72006-08-03 18:58:09 +0000162 shared_ptr<ThreadManagerTests::Task> task = *ix;
Marc Slemkoc7782972006-07-25 02:26:35 +0000163
Mark Slee9b82d272007-05-23 05:16:07 +0000164 int64_t delta = task->_endTime - task->_startTime;
Marc Slemkoc7782972006-07-25 02:26:35 +0000165
166 assert(delta > 0);
167
Mark Sleef5f2be42006-09-05 21:05:31 +0000168 if (task->_startTime < firstTime) {
David Reiss96d23882007-07-26 21:10:32 +0000169 firstTime = task->_startTime;
Marc Slemkoc7782972006-07-25 02:26:35 +0000170 }
171
Mark Sleef5f2be42006-09-05 21:05:31 +0000172 if (task->_endTime > lastTime) {
David Reiss96d23882007-07-26 21:10:32 +0000173 lastTime = task->_endTime;
Marc Slemkoc7782972006-07-25 02:26:35 +0000174 }
175
Mark Sleef5f2be42006-09-05 21:05:31 +0000176 if (delta < minTime) {
David Reiss96d23882007-07-26 21:10:32 +0000177 minTime = delta;
Marc Slemkoc7782972006-07-25 02:26:35 +0000178 }
179
Mark Sleef5f2be42006-09-05 21:05:31 +0000180 if (delta > maxTime) {
David Reiss96d23882007-07-26 21:10:32 +0000181 maxTime = delta;
Marc Slemkoc7782972006-07-25 02:26:35 +0000182 }
183
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100184 averageTime += delta;
Marc Slemko740343d2006-07-20 00:31:02 +0000185 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000186
Marc Slemkoc7782972006-07-25 02:26:35 +0000187 averageTime /= count;
188
James E. King, IIIdf899132016-11-12 15:16:30 -0500189 std::cout << "\t\t\tfirst start: " << firstTime << " Last end: " << lastTime
190 << " min: " << minTime << "ms max: " << maxTime << "ms average: " << averageTime
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100191 << "ms" << std::endl;
Marc Slemko740343d2006-07-20 00:31:02 +0000192
James E. King, IIIdf899132016-11-12 15:16:30 -0500193 bool success = (time01 - time00) >= ((int64_t)count * timeout) / (int64_t)workerCount;
Marc Slemkoc7782972006-07-25 02:26:35 +0000194
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100195 std::cout << "\t\t\t" << (success ? "Success" : "Failure")
James E. King, IIIdf899132016-11-12 15:16:30 -0500196 << "! expected time: " << ((int64_t)count * timeout) / (int64_t)workerCount << "ms elapsed time: " << time01 - time00
197 << "ms" << std::endl;
Marc Slemko740343d2006-07-20 00:31:02 +0000198
Marc Slemkoc7782972006-07-25 02:26:35 +0000199 return success;
Marc Slemko740343d2006-07-20 00:31:02 +0000200 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000201
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100202 class BlockTask : public Runnable {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000203
204 public:
James E. King, IIIdf899132016-11-12 15:16:30 -0500205 BlockTask(Monitor& entryMonitor, Monitor& blockMonitor, bool& blocked, Monitor& doneMonitor, size_t& count)
206 : _entryMonitor(entryMonitor), _entered(false), _blockMonitor(blockMonitor), _blocked(blocked), _doneMonitor(doneMonitor), _count(count) {}
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000207
208 void run() {
209 {
James E. King, IIIdf899132016-11-12 15:16:30 -0500210 Synchronized s(_entryMonitor);
211 _entered = true;
212 _entryMonitor.notify();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000213 }
214
215 {
James E. King, IIIdf899132016-11-12 15:16:30 -0500216 Synchronized s(_blockMonitor);
217 while (_blocked) {
218 _blockMonitor.wait();
219 }
220 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000221
James E. King, IIIdf899132016-11-12 15:16:30 -0500222 {
223 Synchronized s(_doneMonitor);
224 if (--_count == 0) {
225 _doneMonitor.notify();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000226 }
227 }
228 }
229
James E. King, IIIdf899132016-11-12 15:16:30 -0500230 Monitor& _entryMonitor;
231 bool _entered;
232 Monitor& _blockMonitor;
233 bool& _blocked;
234 Monitor& _doneMonitor;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000235 size_t& _count;
236 };
237
238 /**
239 * Block test. Create pendingTaskCountMax tasks. Verify that we block adding the
240 * pendingTaskCountMax + 1th task. Verify that we unblock when a task completes */
241
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100242 bool blockTest(int64_t timeout = 100LL, size_t workerCount = 2) {
243 (void)timeout;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000244 bool success = false;
245
246 try {
247
James E. King, IIIdf899132016-11-12 15:16:30 -0500248 Monitor entryMonitor; // not used by this test
249 Monitor blockMonitor;
250 bool blocked[] = {true, true, true};
251 Monitor doneMonitor;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000252
253 size_t pendingTaskMaxCount = workerCount;
254
255 size_t activeCounts[] = {workerCount, pendingTaskMaxCount, 1};
256
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100257 shared_ptr<ThreadManager> threadManager
258 = ThreadManager::newSimpleThreadManager(workerCount, pendingTaskMaxCount);
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000259
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100260 shared_ptr<PlatformThreadFactory> threadFactory
261 = shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory());
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000262
Nobuaki Sukegawa28256642014-12-16 03:24:37 +0900263#if !USE_BOOST_THREAD && !USE_STD_THREAD
Marc Slemkoa6479032007-06-05 22:20:14 +0000264 threadFactory->setPriority(PosixThreadFactory::HIGHEST);
Roger Meier3faaedf2011-10-02 10:51:45 +0000265#endif
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000266 threadManager->threadFactory(threadFactory);
267
268 threadManager->start();
269
James E. King, IIIdf899132016-11-12 15:16:30 -0500270 std::vector<shared_ptr<ThreadManagerTests::BlockTask> > tasks;
271 tasks.reserve(workerCount + pendingTaskMaxCount);
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000272
273 for (size_t ix = 0; ix < workerCount; ix++) {
274
James E. King, IIIdf899132016-11-12 15:16:30 -0500275 tasks.push_back(shared_ptr<ThreadManagerTests::BlockTask>(
276 new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked[0], doneMonitor, activeCounts[0])));
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000277 }
278
279 for (size_t ix = 0; ix < pendingTaskMaxCount; ix++) {
280
James E. King, IIIdf899132016-11-12 15:16:30 -0500281 tasks.push_back(shared_ptr<ThreadManagerTests::BlockTask>(
282 new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked[1], doneMonitor, activeCounts[1])));
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000283 }
284
James E. King, IIIdf899132016-11-12 15:16:30 -0500285 for (std::vector<shared_ptr<ThreadManagerTests::BlockTask> >::iterator ix = tasks.begin();
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100286 ix != tasks.end();
287 ix++) {
David Reiss96d23882007-07-26 21:10:32 +0000288 threadManager->add(*ix);
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000289 }
290
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100291 if (!(success = (threadManager->totalTaskCount() == pendingTaskMaxCount + workerCount))) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000292 throw TException("Unexpected pending task count");
293 }
294
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100295 shared_ptr<ThreadManagerTests::BlockTask> extraTask(
James E. King, IIIdf899132016-11-12 15:16:30 -0500296 new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked[2], doneMonitor, activeCounts[2]));
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000297
298 try {
299 threadManager->add(extraTask, 1);
300 throw TException("Unexpected success adding task in excess of pending task count");
ben-craigfae08e72015-07-15 11:34:47 -0500301 } catch (TooManyPendingTasksException&) {
David Reiss9fcacc82009-06-04 00:32:54 +0000302 throw TException("Should have timed out adding task in excess of pending task count");
ben-craigfae08e72015-07-15 11:34:47 -0500303 } catch (TimedOutException&) {
David Reiss9fcacc82009-06-04 00:32:54 +0000304 // Expected result
305 }
306
307 try {
308 threadManager->add(extraTask, -1);
309 throw TException("Unexpected success adding task in excess of pending task count");
ben-craigfae08e72015-07-15 11:34:47 -0500310 } catch (TimedOutException&) {
David Reiss9fcacc82009-06-04 00:32:54 +0000311 throw TException("Unexpected timeout adding task in excess of pending task count");
ben-craigfae08e72015-07-15 11:34:47 -0500312 } catch (TooManyPendingTasksException&) {
David Reiss9fcacc82009-06-04 00:32:54 +0000313 // Expected result
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000314 }
315
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100316 std::cout << "\t\t\t"
317 << "Pending tasks " << threadManager->pendingTaskCount() << std::endl;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000318
319 {
James E. King, IIIdf899132016-11-12 15:16:30 -0500320 Synchronized s(blockMonitor);
321 blocked[0] = false;
322 blockMonitor.notifyAll();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000323 }
324
325 {
James E. King, IIIdf899132016-11-12 15:16:30 -0500326 Synchronized s(doneMonitor);
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100327 while (activeCounts[0] != 0) {
James E. King, IIIdf899132016-11-12 15:16:30 -0500328 doneMonitor.wait();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000329 }
330 }
331
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100332 std::cout << "\t\t\t"
333 << "Pending tasks " << threadManager->pendingTaskCount() << std::endl;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000334
335 try {
336 threadManager->add(extraTask, 1);
ben-craigfae08e72015-07-15 11:34:47 -0500337 } catch (TimedOutException&) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100338 std::cout << "\t\t\t"
339 << "add timed out unexpectedly" << std::endl;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000340 throw TException("Unexpected timeout adding task");
341
ben-craigfae08e72015-07-15 11:34:47 -0500342 } catch (TooManyPendingTasksException&) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100343 std::cout << "\t\t\t"
344 << "add encountered too many pending exepctions" << std::endl;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000345 throw TException("Unexpected timeout adding task");
346 }
347
348 // Wake up tasks that were pending before and wait for them to complete
349
350 {
James E. King, IIIdf899132016-11-12 15:16:30 -0500351 Synchronized s(blockMonitor);
352 blocked[1] = false;
353 blockMonitor.notifyAll();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000354 }
355
356 {
James E. King, IIIdf899132016-11-12 15:16:30 -0500357 Synchronized s(doneMonitor);
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100358 while (activeCounts[1] != 0) {
James E. King, IIIdf899132016-11-12 15:16:30 -0500359 doneMonitor.wait();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000360 }
361 }
362
363 // Wake up the extra task and wait for it to complete
364
365 {
James E. King, IIIdf899132016-11-12 15:16:30 -0500366 Synchronized s(blockMonitor);
367 blocked[2] = false;
368 blockMonitor.notifyAll();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000369 }
370
371 {
James E. King, IIIdf899132016-11-12 15:16:30 -0500372 Synchronized s(doneMonitor);
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100373 while (activeCounts[2] != 0) {
James E. King, IIIdf899132016-11-12 15:16:30 -0500374 doneMonitor.wait();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000375 }
376 }
377
James E. King, IIIdf899132016-11-12 15:16:30 -0500378 threadManager->stop();
379
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100380 if (!(success = (threadManager->totalTaskCount() == 0))) {
James E. King, IIIdf899132016-11-12 15:16:30 -0500381 throw TException("Unexpected total task count");
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000382 }
383
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100384 } catch (TException& e) {
David Reiss9fcacc82009-06-04 00:32:54 +0000385 std::cout << "ERROR: " << e.what() << std::endl;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000386 }
387
388 std::cout << "\t\t\t" << (success ? "Success" : "Failure") << std::endl;
389 return success;
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100390 }
James E. King, IIIdf899132016-11-12 15:16:30 -0500391
392
393 bool apiTest() {
394
395 // prove currentTime has milliseconds granularity since many other things depend on it
396 int64_t a = Util::currentTime();
397 sleep_(100);
398 int64_t b = Util::currentTime();
399 if (b - a < 50 || b - a > 150) {
400 std::cerr << "\t\t\texpected 100ms gap, found " << (b-a) << "ms gap instead." << std::endl;
401 return false;
402 }
403
404#if !USE_BOOST_THREAD && !USE_STD_THREAD
405 // test once with a detached thread factory and once with a joinable thread factory
406
407 shared_ptr<PosixThreadFactory> threadFactory
408 = shared_ptr<PosixThreadFactory>(new PosixThreadFactory(false));
409
410 std::cout << "\t\t\tapiTest with joinable thread factory" << std::endl;
411 if (!apiTestWithThreadFactory(threadFactory)) {
412 return false;
413 }
414
415 threadFactory.reset(new PosixThreadFactory(true));
416 std::cout << "\t\t\tapiTest with detached thread factory" << std::endl;
417 return apiTestWithThreadFactory(threadFactory);
418#else
419 return apiTestWithThreadFactory(shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory()));
420#endif
421
422 }
423
424 bool apiTestWithThreadFactory(shared_ptr<PlatformThreadFactory> threadFactory)
425 {
426 shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(1);
427 threadManager->threadFactory(threadFactory);
428
429#if !USE_BOOST_THREAD && !USE_STD_THREAD
430 threadFactory->setPriority(PosixThreadFactory::HIGHEST);
431
432 // verify we cannot change the thread factory to one with the opposite detached setting
433 shared_ptr<PlatformThreadFactory> threadFactory2
434 = shared_ptr<PosixThreadFactory>(new PlatformThreadFactory(
435 PosixThreadFactory::ROUND_ROBIN,
436 PosixThreadFactory::NORMAL,
437 1,
438 !threadFactory->isDetached()));
439 try {
440 threadManager->threadFactory(threadFactory2);
441 // if the call succeeded we changed the thread factory to one that had the opposite setting for "isDetached()".
442 // this is bad, because the thread manager checks with the thread factory to see if it should join threads
443 // as they are leaving - so the detached status of new threads cannot change while there are existing threads.
444 std::cerr << "\t\t\tShould not be able to change thread factory detached disposition" << std::endl;
445 return false;
446 }
447 catch (InvalidArgumentException& ex) {
448 /* expected */
449 }
450#endif
451
452 std::cout << "\t\t\t\tstarting.. " << std::endl;
453
454 threadManager->start();
455 threadManager->setExpireCallback(expiredNotifier); // apache::thrift::stdcxx::bind(&ThreadManagerTests::expiredNotifier, this));
456
457#define EXPECT(FUNC, COUNT) { size_t c = FUNC; if (c != COUNT) { std::cerr << "expected " #FUNC" to be " #COUNT ", but was " << c << std::endl; return false; } }
458
459 EXPECT(threadManager->workerCount(), 1);
460 EXPECT(threadManager->idleWorkerCount(), 1);
461 EXPECT(threadManager->pendingTaskCount(), 0);
462
463 std::cout << "\t\t\t\tadd 2nd worker.. " << std::endl;
464
465 threadManager->addWorker();
466
467 EXPECT(threadManager->workerCount(), 2);
468 EXPECT(threadManager->idleWorkerCount(), 2);
469 EXPECT(threadManager->pendingTaskCount(), 0);
470
471 std::cout << "\t\t\t\tremove 2nd worker.. " << std::endl;
472
473 threadManager->removeWorker();
474
475 EXPECT(threadManager->workerCount(), 1);
476 EXPECT(threadManager->idleWorkerCount(), 1);
477 EXPECT(threadManager->pendingTaskCount(), 0);
478
479 std::cout << "\t\t\t\tremove 1st worker.. " << std::endl;
480
481 threadManager->removeWorker();
482
483 EXPECT(threadManager->workerCount(), 0);
484 EXPECT(threadManager->idleWorkerCount(), 0);
485 EXPECT(threadManager->pendingTaskCount(), 0);
486
487 std::cout << "\t\t\t\tadd blocking task.. " << std::endl;
488
489 // We're going to throw a blocking task into the mix
490 Monitor entryMonitor; // signaled when task is running
491 Monitor blockMonitor; // to be signaled to unblock the task
492 bool blocked(true); // set to false before notifying
493 Monitor doneMonitor; // signaled when count reaches zero
494 size_t activeCount = 1;
495 shared_ptr<ThreadManagerTests::BlockTask> blockingTask(
496 new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked, doneMonitor, activeCount));
497 threadManager->add(blockingTask);
498
499 EXPECT(threadManager->workerCount(), 0);
500 EXPECT(threadManager->idleWorkerCount(), 0);
501 EXPECT(threadManager->pendingTaskCount(), 1);
502
503 std::cout << "\t\t\t\tadd other task.. " << std::endl;
504
505 shared_ptr<ThreadManagerTests::Task> otherTask(
506 new ThreadManagerTests::Task(doneMonitor, activeCount, 0));
507
508 threadManager->add(otherTask);
509
510 EXPECT(threadManager->workerCount(), 0);
511 EXPECT(threadManager->idleWorkerCount(), 0);
512 EXPECT(threadManager->pendingTaskCount(), 2);
513
514 std::cout << "\t\t\t\tremove blocking task specifically.. " << std::endl;
515
516 threadManager->remove(blockingTask);
517
518 EXPECT(threadManager->workerCount(), 0);
519 EXPECT(threadManager->idleWorkerCount(), 0);
520 EXPECT(threadManager->pendingTaskCount(), 1);
521
522 std::cout << "\t\t\t\tremove next pending task.." << std::endl;
523
524 shared_ptr<Runnable> nextTask = threadManager->removeNextPending();
525 if (nextTask != otherTask) {
526 std::cerr << "\t\t\t\t\texpected removeNextPending to return otherTask" << std::endl;
527 return false;
528 }
529
530 EXPECT(threadManager->workerCount(), 0);
531 EXPECT(threadManager->idleWorkerCount(), 0);
532 EXPECT(threadManager->pendingTaskCount(), 0);
533
534 std::cout << "\t\t\t\tremove next pending task (none left).." << std::endl;
535
536 nextTask = threadManager->removeNextPending();
537 if (nextTask) {
538 std::cerr << "\t\t\t\t\texpected removeNextPending to return an empty Runnable" << std::endl;
539 return false;
540 }
541
542 std::cout << "\t\t\t\tadd 2 expired tasks and 1 not.." << std::endl;
543
544 shared_ptr<ThreadManagerTests::Task> expiredTask(
545 new ThreadManagerTests::Task(doneMonitor, activeCount, 0));
546
547 threadManager->add(expiredTask, 0, 1);
548 threadManager->add(blockingTask); // add one that hasn't expired to make sure it gets skipped
549 threadManager->add(expiredTask, 0, 1); // add a second expired to ensure removeExpiredTasks removes both
550
551 sleep_(50); // make sure enough time elapses for it to expire - the shortest expiration time is 1 millisecond
552
553 EXPECT(threadManager->workerCount(), 0);
554 EXPECT(threadManager->idleWorkerCount(), 0);
555 EXPECT(threadManager->pendingTaskCount(), 3);
556 EXPECT(threadManager->expiredTaskCount(), 0);
557
558 std::cout << "\t\t\t\tremove expired tasks.." << std::endl;
559
560 if (!m_expired.empty()) {
561 std::cerr << "\t\t\t\t\texpected m_expired to be empty" << std::endl;
562 return false;
563 }
564
565 threadManager->removeExpiredTasks();
566
567 if (m_expired.size() != 2) {
568 std::cerr << "\t\t\t\t\texpected m_expired to be set" << std::endl;
569 return false;
570 }
571
572 if (m_expired.front() != expiredTask) {
573 std::cerr << "\t\t\t\t\texpected m_expired[0] to be the expired task" << std::endl;
574 return false;
575 }
576 m_expired.pop_front();
577
578 if (m_expired.front() != expiredTask) {
579 std::cerr << "\t\t\t\t\texpected m_expired[1] to be the expired task" << std::endl;
580 return false;
581 }
582
583 m_expired.clear();
584
585 threadManager->remove(blockingTask);
586
587 EXPECT(threadManager->workerCount(), 0);
588 EXPECT(threadManager->idleWorkerCount(), 0);
589 EXPECT(threadManager->pendingTaskCount(), 0);
590 EXPECT(threadManager->expiredTaskCount(), 2);
591
592 std::cout << "\t\t\t\tadd expired task (again).." << std::endl;
593
594 threadManager->add(expiredTask, 0, 1); // expires in 1ms
595 sleep_(50); // make sure enough time elapses for it to expire - the shortest expiration time is 1ms
596
597 std::cout << "\t\t\t\tadd worker to consume expired task.." << std::endl;
598
599 threadManager->addWorker();
600 sleep_(100); // make sure it has time to spin up and expire the task
601
602 if (m_expired.empty()) {
603 std::cerr << "\t\t\t\t\texpected m_expired to be set" << std::endl;
604 return false;
605 }
606
607 if (m_expired.front() != expiredTask) {
608 std::cerr << "\t\t\t\t\texpected m_expired to be the expired task" << std::endl;
609 return false;
610 }
611
612 m_expired.clear();
613
614 EXPECT(threadManager->workerCount(), 1);
615 EXPECT(threadManager->idleWorkerCount(), 1);
616 EXPECT(threadManager->pendingTaskCount(), 0);
617 EXPECT(threadManager->expiredTaskCount(), 3);
618
619 std::cout << "\t\t\t\ttry to remove too many workers" << std::endl;
620 try {
621 threadManager->removeWorker(2);
622 std::cerr << "\t\t\t\t\texpected InvalidArgumentException" << std::endl;
623 return false;
624 } catch (const InvalidArgumentException&) {
625 /* expected */
626 }
627
628 std::cout << "\t\t\t\tremove worker.. " << std::endl;
629
630 threadManager->removeWorker();
631
632 EXPECT(threadManager->workerCount(), 0);
633 EXPECT(threadManager->idleWorkerCount(), 0);
634 EXPECT(threadManager->pendingTaskCount(), 0);
635 EXPECT(threadManager->expiredTaskCount(), 3);
636
637 std::cout << "\t\t\t\tadd blocking task.. " << std::endl;
638
639 threadManager->add(blockingTask);
640
641 EXPECT(threadManager->workerCount(), 0);
642 EXPECT(threadManager->idleWorkerCount(), 0);
643 EXPECT(threadManager->pendingTaskCount(), 1);
644
645 std::cout << "\t\t\t\tadd worker.. " << std::endl;
646
647 threadManager->addWorker();
648 {
649 Synchronized s(entryMonitor);
650 while (!blockingTask->_entered) {
651 entryMonitor.wait();
652 }
653 }
654
655 EXPECT(threadManager->workerCount(), 1);
656 EXPECT(threadManager->idleWorkerCount(), 0);
657 EXPECT(threadManager->pendingTaskCount(), 0);
658
659 std::cout << "\t\t\t\tunblock task and remove worker.. " << std::endl;
660
661 {
662 Synchronized s(blockMonitor);
663 blocked = false;
664 blockMonitor.notifyAll();
665 }
666 threadManager->removeWorker();
667
668 EXPECT(threadManager->workerCount(), 0);
669 EXPECT(threadManager->idleWorkerCount(), 0);
670 EXPECT(threadManager->pendingTaskCount(), 0);
671
672 std::cout << "\t\t\t\tcleanup.. " << std::endl;
673
674 blockingTask.reset();
675 threadManager.reset();
676 return true;
677 }
Marc Slemko740343d2006-07-20 00:31:02 +0000678};
Marc Slemko6f038a72006-08-03 18:58:09 +0000679
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100680}
681}
682}
683} // apache::thrift::concurrency
Marc Slemko740343d2006-07-20 00:31:02 +0000684
T Jake Lucianib5e62212009-01-31 22:36:20 +0000685using namespace apache::thrift::concurrency::test;