blob: d0494ec5d1676216653eda16f39f61ab08daab25 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Roger Meierba406d32013-07-15 22:41:34 +020020#include <thrift/thrift-config.h>
Roger Meier49ff8b12012-04-13 09:12:31 +000021#include <thrift/concurrency/ThreadManager.h>
cyyca8af9b2019-01-11 22:13:12 +080022#include <thrift/concurrency/ThreadFactory.h>
Roger Meier49ff8b12012-04-13 09:12:31 +000023#include <thrift/concurrency/Monitor.h>
Marc Slemko740343d2006-07-20 00:31:02 +000024
25#include <assert.h>
James E. King, IIIdf899132016-11-12 15:16:30 -050026#include <deque>
Marc Slemko740343d2006-07-20 00:31:02 +000027#include <set>
28#include <iostream>
Marc Slemkoc7782972006-07-25 02:26:35 +000029#include <stdint.h>
Dmytro Shteflyukd925d342025-11-30 17:29:12 -050030#include <chrono>
31#include <thread>
Marc Slemko740343d2006-07-20 00:31:02 +000032
Konrad Grochowski16a23a62014-11-13 15:33:38 +010033namespace apache {
34namespace thrift {
35namespace concurrency {
36namespace test {
Marc Slemko740343d2006-07-20 00:31:02 +000037
T Jake Lucianib5e62212009-01-31 22:36:20 +000038using namespace apache::thrift::concurrency;
Marc Slemko740343d2006-07-20 00:31:02 +000039
cyy316723a2019-01-05 16:35:14 +080040static std::deque<std::shared_ptr<Runnable> > m_expired;
41static void expiredNotifier(std::shared_ptr<Runnable> runnable)
James E. King, IIIdf899132016-11-12 15:16:30 -050042{
43 m_expired.push_back(runnable);
44}
Marc Slemko740343d2006-07-20 00:31:02 +000045
James E. King, IIIdf899132016-11-12 15:16:30 -050046static void sleep_(int64_t millisec) {
Dmytro Shteflyukd925d342025-11-30 17:29:12 -050047 std::this_thread::sleep_for(std::chrono::milliseconds(millisec));
James E. King, IIIdf899132016-11-12 15:16:30 -050048}
49
50class ThreadManagerTests {
Konrad Grochowski293a40e2014-09-04 17:28:17 +040051
Marc Slemko740343d2006-07-20 00:31:02 +000052public:
Konrad Grochowski16a23a62014-11-13 15:33:38 +010053 class Task : public Runnable {
Marc Slemko740343d2006-07-20 00:31:02 +000054
55 public:
Konrad Grochowski16a23a62014-11-13 15:33:38 +010056 Task(Monitor& monitor, size_t& count, int64_t timeout)
James E. King, III36200902016-10-05 14:47:18 -040057 : _monitor(monitor), _count(count), _timeout(timeout), _startTime(0), _endTime(0), _done(false) {}
Marc Slemko740343d2006-07-20 00:31:02 +000058
Sebastian Zenker042580f2019-01-29 15:48:12 +010059 void run() override {
Marc Slemko740343d2006-07-20 00:31:02 +000060
cyybfdbd032019-01-12 14:38:28 +080061 _startTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
Marc Slemko740343d2006-07-20 00:31:02 +000062
James E. King, IIIdf899132016-11-12 15:16:30 -050063 sleep_(_timeout);
Marc Slemko740343d2006-07-20 00:31:02 +000064
cyybfdbd032019-01-12 14:38:28 +080065 _endTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
Marc Slemkoc7782972006-07-25 02:26:35 +000066
Marc Slemko740343d2006-07-20 00:31:02 +000067 _done = true;
Marc Slemko3a3b53b2007-05-22 23:59:54 +000068
Mark Sleef5f2be42006-09-05 21:05:31 +000069 {
70 Synchronized s(_monitor);
Marc Slemko740343d2006-07-20 00:31:02 +000071
CJCombrink4a280d52024-03-14 19:57:41 +010072 // std::cout << "Thread " << _count << " completed " << '\n';
Marc Slemko3a3b53b2007-05-22 23:59:54 +000073
David Reiss96d23882007-07-26 21:10:32 +000074 _count--;
James E. King, IIIdf899132016-11-12 15:16:30 -050075 if (_count % 10000 == 0) {
David Reiss96d23882007-07-26 21:10:32 +000076 _monitor.notify();
77 }
Marc Slemko740343d2006-07-20 00:31:02 +000078 }
79 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +000080
Marc Slemko740343d2006-07-20 00:31:02 +000081 Monitor& _monitor;
82 size_t& _count;
Mark Slee9b82d272007-05-23 05:16:07 +000083 int64_t _timeout;
84 int64_t _startTime;
85 int64_t _endTime;
Marc Slemko740343d2006-07-20 00:31:02 +000086 bool _done;
Marc Slemkoc7782972006-07-25 02:26:35 +000087 Monitor _sleep;
Marc Slemko740343d2006-07-20 00:31:02 +000088 };
89
Mark Sleef5f2be42006-09-05 21:05:31 +000090 /**
91 * Dispatch count tasks, each of which blocks for timeout milliseconds then
92 * completes. Verify that all tasks completed and that thread manager cleans
93 * up properly on delete.
94 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +010095 bool loadTest(size_t count = 100, int64_t timeout = 100LL, size_t workerCount = 4) {
Marc Slemko740343d2006-07-20 00:31:02 +000096
97 Monitor monitor;
98
99 size_t activeCount = count;
100
Marc Slemko6f038a72006-08-03 18:58:09 +0000101 shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount);
Marc Slemkoc7782972006-07-25 02:26:35 +0000102
cyyca8af9b2019-01-11 22:13:12 +0800103 shared_ptr<ThreadFactory> threadFactory
104 = shared_ptr<ThreadFactory>(new ThreadFactory(false));
Marc Slemkoc7782972006-07-25 02:26:35 +0000105
Marc Slemkoc7782972006-07-25 02:26:35 +0000106 threadManager->threadFactory(threadFactory);
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000107
108 threadManager->start();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000109
Marc Slemko6f038a72006-08-03 18:58:09 +0000110 std::set<shared_ptr<ThreadManagerTests::Task> > tasks;
Marc Slemko740343d2006-07-20 00:31:02 +0000111
Mark Sleef5f2be42006-09-05 21:05:31 +0000112 for (size_t ix = 0; ix < count; ix++) {
Marc Slemko740343d2006-07-20 00:31:02 +0000113
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100114 tasks.insert(shared_ptr<ThreadManagerTests::Task>(
115 new ThreadManagerTests::Task(monitor, activeCount, timeout)));
Marc Slemko740343d2006-07-20 00:31:02 +0000116 }
117
cyybfdbd032019-01-12 14:38:28 +0800118 int64_t time00 = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000119
Sebastian Zenker042580f2019-01-29 15:48:12 +0100120 for (auto ix = tasks.begin();
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100121 ix != tasks.end();
122 ix++) {
Marc Slemko740343d2006-07-20 00:31:02 +0000123
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100124 threadManager->add(*ix);
Marc Slemko740343d2006-07-20 00:31:02 +0000125 }
126
CJCombrink4a280d52024-03-14 19:57:41 +0100127 std::cout << "\t\t\t\tloaded " << count << " tasks to execute" << '\n';
James E. King, IIIdf899132016-11-12 15:16:30 -0500128
Mark Sleef5f2be42006-09-05 21:05:31 +0000129 {
130 Synchronized s(monitor);
131
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100132 while (activeCount > 0) {
CJCombrink4a280d52024-03-14 19:57:41 +0100133 std::cout << "\t\t\t\tactiveCount = " << activeCount << '\n';
David Reiss96d23882007-07-26 21:10:32 +0000134 monitor.wait();
Marc Slemko740343d2006-07-20 00:31:02 +0000135 }
136 }
137
cyybfdbd032019-01-12 14:38:28 +0800138 int64_t time01 = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
Marc Slemko740343d2006-07-20 00:31:02 +0000139
Mark Slee9b82d272007-05-23 05:16:07 +0000140 int64_t firstTime = 9223372036854775807LL;
141 int64_t lastTime = 0;
Marc Slemkoc7782972006-07-25 02:26:35 +0000142
143 double averageTime = 0;
Mark Slee9b82d272007-05-23 05:16:07 +0000144 int64_t minTime = 9223372036854775807LL;
145 int64_t maxTime = 0;
Marc Slemkoc7782972006-07-25 02:26:35 +0000146
Sebastian Zenker042580f2019-01-29 15:48:12 +0100147 for (auto ix = tasks.begin();
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100148 ix != tasks.end();
149 ix++) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000150
Marc Slemko6f038a72006-08-03 18:58:09 +0000151 shared_ptr<ThreadManagerTests::Task> task = *ix;
Marc Slemkoc7782972006-07-25 02:26:35 +0000152
Mark Slee9b82d272007-05-23 05:16:07 +0000153 int64_t delta = task->_endTime - task->_startTime;
Marc Slemkoc7782972006-07-25 02:26:35 +0000154
155 assert(delta > 0);
156
Mark Sleef5f2be42006-09-05 21:05:31 +0000157 if (task->_startTime < firstTime) {
David Reiss96d23882007-07-26 21:10:32 +0000158 firstTime = task->_startTime;
Marc Slemkoc7782972006-07-25 02:26:35 +0000159 }
160
Mark Sleef5f2be42006-09-05 21:05:31 +0000161 if (task->_endTime > lastTime) {
David Reiss96d23882007-07-26 21:10:32 +0000162 lastTime = task->_endTime;
Marc Slemkoc7782972006-07-25 02:26:35 +0000163 }
164
Mark Sleef5f2be42006-09-05 21:05:31 +0000165 if (delta < minTime) {
David Reiss96d23882007-07-26 21:10:32 +0000166 minTime = delta;
Marc Slemkoc7782972006-07-25 02:26:35 +0000167 }
168
Mark Sleef5f2be42006-09-05 21:05:31 +0000169 if (delta > maxTime) {
David Reiss96d23882007-07-26 21:10:32 +0000170 maxTime = delta;
Marc Slemkoc7782972006-07-25 02:26:35 +0000171 }
172
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100173 averageTime += delta;
Marc Slemko740343d2006-07-20 00:31:02 +0000174 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000175
Marc Slemkoc7782972006-07-25 02:26:35 +0000176 averageTime /= count;
177
James E. King, IIIdf899132016-11-12 15:16:30 -0500178 std::cout << "\t\t\tfirst start: " << firstTime << " Last end: " << lastTime
179 << " min: " << minTime << "ms max: " << maxTime << "ms average: " << averageTime
CJCombrink4a280d52024-03-14 19:57:41 +0100180 << "ms" << '\n';
Marc Slemko740343d2006-07-20 00:31:02 +0000181
James E. King, IIIdf899132016-11-12 15:16:30 -0500182 bool success = (time01 - time00) >= ((int64_t)count * timeout) / (int64_t)workerCount;
Marc Slemkoc7782972006-07-25 02:26:35 +0000183
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100184 std::cout << "\t\t\t" << (success ? "Success" : "Failure")
James E. King, IIIdf899132016-11-12 15:16:30 -0500185 << "! expected time: " << ((int64_t)count * timeout) / (int64_t)workerCount << "ms elapsed time: " << time01 - time00
CJCombrink4a280d52024-03-14 19:57:41 +0100186 << "ms" << '\n';
Marc Slemko740343d2006-07-20 00:31:02 +0000187
Marc Slemkoc7782972006-07-25 02:26:35 +0000188 return success;
Marc Slemko740343d2006-07-20 00:31:02 +0000189 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000190
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100191 class BlockTask : public Runnable {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000192
193 public:
James E. King, IIIdf899132016-11-12 15:16:30 -0500194 BlockTask(Monitor& entryMonitor, Monitor& blockMonitor, bool& blocked, Monitor& doneMonitor, size_t& count)
195 : _entryMonitor(entryMonitor), _entered(false), _blockMonitor(blockMonitor), _blocked(blocked), _doneMonitor(doneMonitor), _count(count) {}
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000196
Sebastian Zenker042580f2019-01-29 15:48:12 +0100197 void run() override {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000198 {
James E. King, IIIdf899132016-11-12 15:16:30 -0500199 Synchronized s(_entryMonitor);
200 _entered = true;
201 _entryMonitor.notify();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000202 }
203
204 {
James E. King, IIIdf899132016-11-12 15:16:30 -0500205 Synchronized s(_blockMonitor);
206 while (_blocked) {
207 _blockMonitor.wait();
208 }
209 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000210
James E. King, IIIdf899132016-11-12 15:16:30 -0500211 {
212 Synchronized s(_doneMonitor);
213 if (--_count == 0) {
214 _doneMonitor.notify();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000215 }
216 }
217 }
218
James E. King, IIIdf899132016-11-12 15:16:30 -0500219 Monitor& _entryMonitor;
220 bool _entered;
221 Monitor& _blockMonitor;
222 bool& _blocked;
223 Monitor& _doneMonitor;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000224 size_t& _count;
225 };
226
227 /**
228 * Block test. Create pendingTaskCountMax tasks. Verify that we block adding the
229 * pendingTaskCountMax + 1th task. Verify that we unblock when a task completes */
230
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100231 bool blockTest(int64_t timeout = 100LL, size_t workerCount = 2) {
232 (void)timeout;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000233 bool success = false;
234
235 try {
236
James E. King, IIIdf899132016-11-12 15:16:30 -0500237 Monitor entryMonitor; // not used by this test
238 Monitor blockMonitor;
239 bool blocked[] = {true, true, true};
240 Monitor doneMonitor;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000241
242 size_t pendingTaskMaxCount = workerCount;
243
244 size_t activeCounts[] = {workerCount, pendingTaskMaxCount, 1};
245
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100246 shared_ptr<ThreadManager> threadManager
247 = ThreadManager::newSimpleThreadManager(workerCount, pendingTaskMaxCount);
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000248
cyyca8af9b2019-01-11 22:13:12 +0800249 shared_ptr<ThreadFactory> threadFactory
250 = shared_ptr<ThreadFactory>(new ThreadFactory());
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000251
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000252 threadManager->threadFactory(threadFactory);
253
254 threadManager->start();
255
James E. King, IIIdf899132016-11-12 15:16:30 -0500256 std::vector<shared_ptr<ThreadManagerTests::BlockTask> > tasks;
257 tasks.reserve(workerCount + pendingTaskMaxCount);
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000258
259 for (size_t ix = 0; ix < workerCount; ix++) {
260
James E. King, IIIdf899132016-11-12 15:16:30 -0500261 tasks.push_back(shared_ptr<ThreadManagerTests::BlockTask>(
262 new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked[0], doneMonitor, activeCounts[0])));
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000263 }
264
265 for (size_t ix = 0; ix < pendingTaskMaxCount; ix++) {
266
James E. King, IIIdf899132016-11-12 15:16:30 -0500267 tasks.push_back(shared_ptr<ThreadManagerTests::BlockTask>(
268 new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked[1], doneMonitor, activeCounts[1])));
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000269 }
270
Sebastian Zenker042580f2019-01-29 15:48:12 +0100271 for (auto ix = tasks.begin();
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100272 ix != tasks.end();
273 ix++) {
David Reiss96d23882007-07-26 21:10:32 +0000274 threadManager->add(*ix);
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000275 }
276
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100277 if (!(success = (threadManager->totalTaskCount() == pendingTaskMaxCount + workerCount))) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000278 throw TException("Unexpected pending task count");
279 }
280
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100281 shared_ptr<ThreadManagerTests::BlockTask> extraTask(
James E. King, IIIdf899132016-11-12 15:16:30 -0500282 new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked[2], doneMonitor, activeCounts[2]));
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000283
284 try {
285 threadManager->add(extraTask, 1);
286 throw TException("Unexpected success adding task in excess of pending task count");
ben-craigfae08e72015-07-15 11:34:47 -0500287 } catch (TooManyPendingTasksException&) {
David Reiss9fcacc82009-06-04 00:32:54 +0000288 throw TException("Should have timed out adding task in excess of pending task count");
ben-craigfae08e72015-07-15 11:34:47 -0500289 } catch (TimedOutException&) {
David Reiss9fcacc82009-06-04 00:32:54 +0000290 // Expected result
291 }
292
293 try {
294 threadManager->add(extraTask, -1);
295 throw TException("Unexpected success adding task in excess of pending task count");
ben-craigfae08e72015-07-15 11:34:47 -0500296 } catch (TimedOutException&) {
David Reiss9fcacc82009-06-04 00:32:54 +0000297 throw TException("Unexpected timeout adding task in excess of pending task count");
ben-craigfae08e72015-07-15 11:34:47 -0500298 } catch (TooManyPendingTasksException&) {
David Reiss9fcacc82009-06-04 00:32:54 +0000299 // Expected result
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000300 }
301
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100302 std::cout << "\t\t\t"
CJCombrink4a280d52024-03-14 19:57:41 +0100303 << "Pending tasks " << threadManager->pendingTaskCount() << '\n';
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000304
305 {
James E. King, IIIdf899132016-11-12 15:16:30 -0500306 Synchronized s(blockMonitor);
307 blocked[0] = false;
308 blockMonitor.notifyAll();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000309 }
310
311 {
James E. King, IIIdf899132016-11-12 15:16:30 -0500312 Synchronized s(doneMonitor);
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100313 while (activeCounts[0] != 0) {
James E. King, IIIdf899132016-11-12 15:16:30 -0500314 doneMonitor.wait();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000315 }
316 }
317
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100318 std::cout << "\t\t\t"
CJCombrink4a280d52024-03-14 19:57:41 +0100319 << "Pending tasks " << threadManager->pendingTaskCount() << '\n';
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000320
321 try {
322 threadManager->add(extraTask, 1);
ben-craigfae08e72015-07-15 11:34:47 -0500323 } catch (TimedOutException&) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100324 std::cout << "\t\t\t"
CJCombrink4a280d52024-03-14 19:57:41 +0100325 << "add timed out unexpectedly" << '\n';
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000326 throw TException("Unexpected timeout adding task");
327
ben-craigfae08e72015-07-15 11:34:47 -0500328 } catch (TooManyPendingTasksException&) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100329 std::cout << "\t\t\t"
CJCombrink4a280d52024-03-14 19:57:41 +0100330 << "add encountered too many pending exepctions" << '\n';
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000331 throw TException("Unexpected timeout adding task");
332 }
333
334 // Wake up tasks that were pending before and wait for them to complete
335
336 {
James E. King, IIIdf899132016-11-12 15:16:30 -0500337 Synchronized s(blockMonitor);
338 blocked[1] = false;
339 blockMonitor.notifyAll();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000340 }
341
342 {
James E. King, IIIdf899132016-11-12 15:16:30 -0500343 Synchronized s(doneMonitor);
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100344 while (activeCounts[1] != 0) {
James E. King, IIIdf899132016-11-12 15:16:30 -0500345 doneMonitor.wait();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000346 }
347 }
348
349 // Wake up the extra task and wait for it to complete
350
351 {
James E. King, IIIdf899132016-11-12 15:16:30 -0500352 Synchronized s(blockMonitor);
353 blocked[2] = false;
354 blockMonitor.notifyAll();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000355 }
356
357 {
James E. King, IIIdf899132016-11-12 15:16:30 -0500358 Synchronized s(doneMonitor);
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100359 while (activeCounts[2] != 0) {
James E. King, IIIdf899132016-11-12 15:16:30 -0500360 doneMonitor.wait();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000361 }
362 }
363
James E. King, IIIdf899132016-11-12 15:16:30 -0500364 threadManager->stop();
365
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100366 if (!(success = (threadManager->totalTaskCount() == 0))) {
James E. King, IIIdf899132016-11-12 15:16:30 -0500367 throw TException("Unexpected total task count");
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000368 }
369
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100370 } catch (TException& e) {
CJCombrink4a280d52024-03-14 19:57:41 +0100371 std::cout << "ERROR: " << e.what() << '\n';
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000372 }
373
CJCombrink4a280d52024-03-14 19:57:41 +0100374 std::cout << "\t\t\t" << (success ? "Success" : "Failure") << '\n';
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000375 return success;
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100376 }
James E. King, IIIdf899132016-11-12 15:16:30 -0500377
378
379 bool apiTest() {
380
381 // prove currentTime has milliseconds granularity since many other things depend on it
cyybfdbd032019-01-12 14:38:28 +0800382 int64_t a = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
James E. King, IIIdf899132016-11-12 15:16:30 -0500383 sleep_(100);
cyybfdbd032019-01-12 14:38:28 +0800384 int64_t b = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
James E. King, IIIdf899132016-11-12 15:16:30 -0500385 if (b - a < 50 || b - a > 150) {
CJCombrink4a280d52024-03-14 19:57:41 +0100386 std::cerr << "\t\t\texpected 100ms gap, found " << (b-a) << "ms gap instead." << '\n';
James E. King, IIIdf899132016-11-12 15:16:30 -0500387 return false;
388 }
389
cyyca8af9b2019-01-11 22:13:12 +0800390 return apiTestWithThreadFactory(shared_ptr<ThreadFactory>(new ThreadFactory()));
James E. King, IIIdf899132016-11-12 15:16:30 -0500391
392 }
393
cyyca8af9b2019-01-11 22:13:12 +0800394 bool apiTestWithThreadFactory(shared_ptr<ThreadFactory> threadFactory)
James E. King, IIIdf899132016-11-12 15:16:30 -0500395 {
396 shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(1);
397 threadManager->threadFactory(threadFactory);
398
CJCombrink4a280d52024-03-14 19:57:41 +0100399 std::cout << "\t\t\t\tstarting.. " << '\n';
James E. King, IIIdf899132016-11-12 15:16:30 -0500400
401 threadManager->start();
cyy316723a2019-01-05 16:35:14 +0800402 threadManager->setExpireCallback(expiredNotifier); // std::bind(&ThreadManagerTests::expiredNotifier, this));
James E. King, IIIdf899132016-11-12 15:16:30 -0500403
CJCombrink4a280d52024-03-14 19:57:41 +0100404#define EXPECT(FUNC, COUNT) { size_t c = FUNC; if (c != COUNT) { std::cerr << "expected " #FUNC" to be " #COUNT ", but was " << c << '\n'; return false; } }
James E. King, IIIdf899132016-11-12 15:16:30 -0500405
406 EXPECT(threadManager->workerCount(), 1);
407 EXPECT(threadManager->idleWorkerCount(), 1);
408 EXPECT(threadManager->pendingTaskCount(), 0);
409
CJCombrink4a280d52024-03-14 19:57:41 +0100410 std::cout << "\t\t\t\tadd 2nd worker.. " << '\n';
James E. King, IIIdf899132016-11-12 15:16:30 -0500411
412 threadManager->addWorker();
413
414 EXPECT(threadManager->workerCount(), 2);
415 EXPECT(threadManager->idleWorkerCount(), 2);
416 EXPECT(threadManager->pendingTaskCount(), 0);
417
CJCombrink4a280d52024-03-14 19:57:41 +0100418 std::cout << "\t\t\t\tremove 2nd worker.. " << '\n';
James E. King, IIIdf899132016-11-12 15:16:30 -0500419
420 threadManager->removeWorker();
421
422 EXPECT(threadManager->workerCount(), 1);
423 EXPECT(threadManager->idleWorkerCount(), 1);
424 EXPECT(threadManager->pendingTaskCount(), 0);
425
CJCombrink4a280d52024-03-14 19:57:41 +0100426 std::cout << "\t\t\t\tremove 1st worker.. " << '\n';
James E. King, IIIdf899132016-11-12 15:16:30 -0500427
428 threadManager->removeWorker();
429
430 EXPECT(threadManager->workerCount(), 0);
431 EXPECT(threadManager->idleWorkerCount(), 0);
432 EXPECT(threadManager->pendingTaskCount(), 0);
433
CJCombrink4a280d52024-03-14 19:57:41 +0100434 std::cout << "\t\t\t\tadd blocking task.. " << '\n';
James E. King, IIIdf899132016-11-12 15:16:30 -0500435
436 // We're going to throw a blocking task into the mix
437 Monitor entryMonitor; // signaled when task is running
438 Monitor blockMonitor; // to be signaled to unblock the task
439 bool blocked(true); // set to false before notifying
440 Monitor doneMonitor; // signaled when count reaches zero
441 size_t activeCount = 1;
442 shared_ptr<ThreadManagerTests::BlockTask> blockingTask(
443 new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked, doneMonitor, activeCount));
444 threadManager->add(blockingTask);
445
446 EXPECT(threadManager->workerCount(), 0);
447 EXPECT(threadManager->idleWorkerCount(), 0);
448 EXPECT(threadManager->pendingTaskCount(), 1);
449
CJCombrink4a280d52024-03-14 19:57:41 +0100450 std::cout << "\t\t\t\tadd other task.. " << '\n';
James E. King, IIIdf899132016-11-12 15:16:30 -0500451
452 shared_ptr<ThreadManagerTests::Task> otherTask(
453 new ThreadManagerTests::Task(doneMonitor, activeCount, 0));
454
455 threadManager->add(otherTask);
456
457 EXPECT(threadManager->workerCount(), 0);
458 EXPECT(threadManager->idleWorkerCount(), 0);
459 EXPECT(threadManager->pendingTaskCount(), 2);
460
CJCombrink4a280d52024-03-14 19:57:41 +0100461 std::cout << "\t\t\t\tremove blocking task specifically.. " << '\n';
James E. King, IIIdf899132016-11-12 15:16:30 -0500462
463 threadManager->remove(blockingTask);
464
465 EXPECT(threadManager->workerCount(), 0);
466 EXPECT(threadManager->idleWorkerCount(), 0);
467 EXPECT(threadManager->pendingTaskCount(), 1);
468
CJCombrink4a280d52024-03-14 19:57:41 +0100469 std::cout << "\t\t\t\tremove next pending task.." << '\n';
James E. King, IIIdf899132016-11-12 15:16:30 -0500470
471 shared_ptr<Runnable> nextTask = threadManager->removeNextPending();
472 if (nextTask != otherTask) {
CJCombrink4a280d52024-03-14 19:57:41 +0100473 std::cerr << "\t\t\t\t\texpected removeNextPending to return otherTask" << '\n';
James E. King, IIIdf899132016-11-12 15:16:30 -0500474 return false;
475 }
476
477 EXPECT(threadManager->workerCount(), 0);
478 EXPECT(threadManager->idleWorkerCount(), 0);
479 EXPECT(threadManager->pendingTaskCount(), 0);
480
CJCombrink4a280d52024-03-14 19:57:41 +0100481 std::cout << "\t\t\t\tremove next pending task (none left).." << '\n';
James E. King, IIIdf899132016-11-12 15:16:30 -0500482
483 nextTask = threadManager->removeNextPending();
484 if (nextTask) {
CJCombrink4a280d52024-03-14 19:57:41 +0100485 std::cerr << "\t\t\t\t\texpected removeNextPending to return an empty Runnable" << '\n';
James E. King, IIIdf899132016-11-12 15:16:30 -0500486 return false;
487 }
488
CJCombrink4a280d52024-03-14 19:57:41 +0100489 std::cout << "\t\t\t\tadd 2 expired tasks and 1 not.." << '\n';
James E. King, IIIdf899132016-11-12 15:16:30 -0500490
491 shared_ptr<ThreadManagerTests::Task> expiredTask(
492 new ThreadManagerTests::Task(doneMonitor, activeCount, 0));
493
494 threadManager->add(expiredTask, 0, 1);
495 threadManager->add(blockingTask); // add one that hasn't expired to make sure it gets skipped
496 threadManager->add(expiredTask, 0, 1); // add a second expired to ensure removeExpiredTasks removes both
497
498 sleep_(50); // make sure enough time elapses for it to expire - the shortest expiration time is 1 millisecond
499
500 EXPECT(threadManager->workerCount(), 0);
501 EXPECT(threadManager->idleWorkerCount(), 0);
502 EXPECT(threadManager->pendingTaskCount(), 3);
503 EXPECT(threadManager->expiredTaskCount(), 0);
504
CJCombrink4a280d52024-03-14 19:57:41 +0100505 std::cout << "\t\t\t\tremove expired tasks.." << '\n';
James E. King, IIIdf899132016-11-12 15:16:30 -0500506
507 if (!m_expired.empty()) {
CJCombrink4a280d52024-03-14 19:57:41 +0100508 std::cerr << "\t\t\t\t\texpected m_expired to be empty" << '\n';
James E. King, IIIdf899132016-11-12 15:16:30 -0500509 return false;
510 }
511
512 threadManager->removeExpiredTasks();
513
514 if (m_expired.size() != 2) {
CJCombrink4a280d52024-03-14 19:57:41 +0100515 std::cerr << "\t\t\t\t\texpected m_expired to be set" << '\n';
James E. King, IIIdf899132016-11-12 15:16:30 -0500516 return false;
517 }
518
519 if (m_expired.front() != expiredTask) {
CJCombrink4a280d52024-03-14 19:57:41 +0100520 std::cerr << "\t\t\t\t\texpected m_expired[0] to be the expired task" << '\n';
James E. King, IIIdf899132016-11-12 15:16:30 -0500521 return false;
522 }
523 m_expired.pop_front();
524
525 if (m_expired.front() != expiredTask) {
CJCombrink4a280d52024-03-14 19:57:41 +0100526 std::cerr << "\t\t\t\t\texpected m_expired[1] to be the expired task" << '\n';
James E. King, IIIdf899132016-11-12 15:16:30 -0500527 return false;
528 }
529
530 m_expired.clear();
531
532 threadManager->remove(blockingTask);
533
534 EXPECT(threadManager->workerCount(), 0);
535 EXPECT(threadManager->idleWorkerCount(), 0);
536 EXPECT(threadManager->pendingTaskCount(), 0);
537 EXPECT(threadManager->expiredTaskCount(), 2);
538
CJCombrink4a280d52024-03-14 19:57:41 +0100539 std::cout << "\t\t\t\tadd expired task (again).." << '\n';
James E. King, IIIdf899132016-11-12 15:16:30 -0500540
541 threadManager->add(expiredTask, 0, 1); // expires in 1ms
542 sleep_(50); // make sure enough time elapses for it to expire - the shortest expiration time is 1ms
543
CJCombrink4a280d52024-03-14 19:57:41 +0100544 std::cout << "\t\t\t\tadd worker to consume expired task.." << '\n';
James E. King, IIIdf899132016-11-12 15:16:30 -0500545
546 threadManager->addWorker();
547 sleep_(100); // make sure it has time to spin up and expire the task
548
549 if (m_expired.empty()) {
CJCombrink4a280d52024-03-14 19:57:41 +0100550 std::cerr << "\t\t\t\t\texpected m_expired to be set" << '\n';
James E. King, IIIdf899132016-11-12 15:16:30 -0500551 return false;
552 }
553
554 if (m_expired.front() != expiredTask) {
CJCombrink4a280d52024-03-14 19:57:41 +0100555 std::cerr << "\t\t\t\t\texpected m_expired to be the expired task" << '\n';
James E. King, IIIdf899132016-11-12 15:16:30 -0500556 return false;
557 }
558
559 m_expired.clear();
560
561 EXPECT(threadManager->workerCount(), 1);
562 EXPECT(threadManager->idleWorkerCount(), 1);
563 EXPECT(threadManager->pendingTaskCount(), 0);
564 EXPECT(threadManager->expiredTaskCount(), 3);
565
CJCombrink4a280d52024-03-14 19:57:41 +0100566 std::cout << "\t\t\t\ttry to remove too many workers" << '\n';
James E. King, IIIdf899132016-11-12 15:16:30 -0500567 try {
568 threadManager->removeWorker(2);
CJCombrink4a280d52024-03-14 19:57:41 +0100569 std::cerr << "\t\t\t\t\texpected InvalidArgumentException" << '\n';
James E. King, IIIdf899132016-11-12 15:16:30 -0500570 return false;
571 } catch (const InvalidArgumentException&) {
572 /* expected */
573 }
574
CJCombrink4a280d52024-03-14 19:57:41 +0100575 std::cout << "\t\t\t\tremove worker.. " << '\n';
James E. King, IIIdf899132016-11-12 15:16:30 -0500576
577 threadManager->removeWorker();
578
579 EXPECT(threadManager->workerCount(), 0);
580 EXPECT(threadManager->idleWorkerCount(), 0);
581 EXPECT(threadManager->pendingTaskCount(), 0);
582 EXPECT(threadManager->expiredTaskCount(), 3);
583
CJCombrink4a280d52024-03-14 19:57:41 +0100584 std::cout << "\t\t\t\tadd blocking task.. " << '\n';
James E. King, IIIdf899132016-11-12 15:16:30 -0500585
586 threadManager->add(blockingTask);
587
588 EXPECT(threadManager->workerCount(), 0);
589 EXPECT(threadManager->idleWorkerCount(), 0);
590 EXPECT(threadManager->pendingTaskCount(), 1);
591
CJCombrink4a280d52024-03-14 19:57:41 +0100592 std::cout << "\t\t\t\tadd worker.. " << '\n';
James E. King, IIIdf899132016-11-12 15:16:30 -0500593
594 threadManager->addWorker();
595 {
596 Synchronized s(entryMonitor);
597 while (!blockingTask->_entered) {
598 entryMonitor.wait();
599 }
600 }
601
602 EXPECT(threadManager->workerCount(), 1);
603 EXPECT(threadManager->idleWorkerCount(), 0);
604 EXPECT(threadManager->pendingTaskCount(), 0);
605
CJCombrink4a280d52024-03-14 19:57:41 +0100606 std::cout << "\t\t\t\tunblock task and remove worker.. " << '\n';
James E. King, IIIdf899132016-11-12 15:16:30 -0500607
608 {
609 Synchronized s(blockMonitor);
610 blocked = false;
611 blockMonitor.notifyAll();
612 }
613 threadManager->removeWorker();
614
615 EXPECT(threadManager->workerCount(), 0);
616 EXPECT(threadManager->idleWorkerCount(), 0);
617 EXPECT(threadManager->pendingTaskCount(), 0);
618
CJCombrink4a280d52024-03-14 19:57:41 +0100619 std::cout << "\t\t\t\tcleanup.. " << '\n';
James E. King, IIIdf899132016-11-12 15:16:30 -0500620
621 blockingTask.reset();
622 threadManager.reset();
623 return true;
624 }
Marc Slemko740343d2006-07-20 00:31:02 +0000625};
Marc Slemko6f038a72006-08-03 18:58:09 +0000626
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100627}
628}
629}
630} // apache::thrift::concurrency
Marc Slemko740343d2006-07-20 00:31:02 +0000631
T Jake Lucianib5e62212009-01-31 22:36:20 +0000632using namespace apache::thrift::concurrency::test;