blob: e9ed75653bc7607f94901e5fe05114ed2911000b [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Roger Meierba406d32013-07-15 22:41:34 +020020#include <thrift/thrift-config.h>
Roger Meier49ff8b12012-04-13 09:12:31 +000021#include <thrift/concurrency/ThreadManager.h>
cyyca8af9b2019-01-11 22:13:12 +080022#include <thrift/concurrency/ThreadFactory.h>
Roger Meier49ff8b12012-04-13 09:12:31 +000023#include <thrift/concurrency/Monitor.h>
Marc Slemko740343d2006-07-20 00:31:02 +000024
25#include <assert.h>
James E. King, IIIdf899132016-11-12 15:16:30 -050026#include <deque>
Marc Slemko740343d2006-07-20 00:31:02 +000027#include <set>
28#include <iostream>
Marc Slemkoc7782972006-07-25 02:26:35 +000029#include <stdint.h>
Marc Slemko740343d2006-07-20 00:31:02 +000030
Konrad Grochowski16a23a62014-11-13 15:33:38 +010031namespace apache {
32namespace thrift {
33namespace concurrency {
34namespace test {
Marc Slemko740343d2006-07-20 00:31:02 +000035
T Jake Lucianib5e62212009-01-31 22:36:20 +000036using namespace apache::thrift::concurrency;
Marc Slemko740343d2006-07-20 00:31:02 +000037
cyy316723a2019-01-05 16:35:14 +080038static std::deque<std::shared_ptr<Runnable> > m_expired;
39static void expiredNotifier(std::shared_ptr<Runnable> runnable)
James E. King, IIIdf899132016-11-12 15:16:30 -050040{
41 m_expired.push_back(runnable);
42}
Marc Slemko740343d2006-07-20 00:31:02 +000043
James E. King, IIIdf899132016-11-12 15:16:30 -050044static void sleep_(int64_t millisec) {
45 Monitor _sleep;
46 Synchronized s(_sleep);
47
48 try {
49 _sleep.wait(millisec);
50 } catch (TimedOutException&) {
51 ;
52 } catch (...) {
53 assert(0);
54 }
55}
56
57class ThreadManagerTests {
Konrad Grochowski293a40e2014-09-04 17:28:17 +040058
Marc Slemko740343d2006-07-20 00:31:02 +000059public:
Konrad Grochowski16a23a62014-11-13 15:33:38 +010060 class Task : public Runnable {
Marc Slemko740343d2006-07-20 00:31:02 +000061
62 public:
Konrad Grochowski16a23a62014-11-13 15:33:38 +010063 Task(Monitor& monitor, size_t& count, int64_t timeout)
James E. King, III36200902016-10-05 14:47:18 -040064 : _monitor(monitor), _count(count), _timeout(timeout), _startTime(0), _endTime(0), _done(false) {}
Marc Slemko740343d2006-07-20 00:31:02 +000065
66 void run() {
67
cyybfdbd032019-01-12 14:38:28 +080068 _startTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
Marc Slemko740343d2006-07-20 00:31:02 +000069
James E. King, IIIdf899132016-11-12 15:16:30 -050070 sleep_(_timeout);
Marc Slemko740343d2006-07-20 00:31:02 +000071
cyybfdbd032019-01-12 14:38:28 +080072 _endTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
Marc Slemkoc7782972006-07-25 02:26:35 +000073
Marc Slemko740343d2006-07-20 00:31:02 +000074 _done = true;
Marc Slemko3a3b53b2007-05-22 23:59:54 +000075
Mark Sleef5f2be42006-09-05 21:05:31 +000076 {
77 Synchronized s(_monitor);
Marc Slemko740343d2006-07-20 00:31:02 +000078
David Reiss96d23882007-07-26 21:10:32 +000079 // std::cout << "Thread " << _count << " completed " << std::endl;
Marc Slemko3a3b53b2007-05-22 23:59:54 +000080
David Reiss96d23882007-07-26 21:10:32 +000081 _count--;
James E. King, IIIdf899132016-11-12 15:16:30 -050082 if (_count % 10000 == 0) {
David Reiss96d23882007-07-26 21:10:32 +000083 _monitor.notify();
84 }
Marc Slemko740343d2006-07-20 00:31:02 +000085 }
86 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +000087
Marc Slemko740343d2006-07-20 00:31:02 +000088 Monitor& _monitor;
89 size_t& _count;
Mark Slee9b82d272007-05-23 05:16:07 +000090 int64_t _timeout;
91 int64_t _startTime;
92 int64_t _endTime;
Marc Slemko740343d2006-07-20 00:31:02 +000093 bool _done;
Marc Slemkoc7782972006-07-25 02:26:35 +000094 Monitor _sleep;
Marc Slemko740343d2006-07-20 00:31:02 +000095 };
96
Mark Sleef5f2be42006-09-05 21:05:31 +000097 /**
98 * Dispatch count tasks, each of which blocks for timeout milliseconds then
99 * completes. Verify that all tasks completed and that thread manager cleans
100 * up properly on delete.
101 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100102 bool loadTest(size_t count = 100, int64_t timeout = 100LL, size_t workerCount = 4) {
Marc Slemko740343d2006-07-20 00:31:02 +0000103
104 Monitor monitor;
105
106 size_t activeCount = count;
107
Marc Slemko6f038a72006-08-03 18:58:09 +0000108 shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount);
Marc Slemkoc7782972006-07-25 02:26:35 +0000109
cyyca8af9b2019-01-11 22:13:12 +0800110 shared_ptr<ThreadFactory> threadFactory
111 = shared_ptr<ThreadFactory>(new ThreadFactory(false));
Marc Slemkoc7782972006-07-25 02:26:35 +0000112
Marc Slemkoc7782972006-07-25 02:26:35 +0000113 threadManager->threadFactory(threadFactory);
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000114
115 threadManager->start();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000116
Marc Slemko6f038a72006-08-03 18:58:09 +0000117 std::set<shared_ptr<ThreadManagerTests::Task> > tasks;
Marc Slemko740343d2006-07-20 00:31:02 +0000118
Mark Sleef5f2be42006-09-05 21:05:31 +0000119 for (size_t ix = 0; ix < count; ix++) {
Marc Slemko740343d2006-07-20 00:31:02 +0000120
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100121 tasks.insert(shared_ptr<ThreadManagerTests::Task>(
122 new ThreadManagerTests::Task(monitor, activeCount, timeout)));
Marc Slemko740343d2006-07-20 00:31:02 +0000123 }
124
cyybfdbd032019-01-12 14:38:28 +0800125 int64_t time00 = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000126
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100127 for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin();
128 ix != tasks.end();
129 ix++) {
Marc Slemko740343d2006-07-20 00:31:02 +0000130
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100131 threadManager->add(*ix);
Marc Slemko740343d2006-07-20 00:31:02 +0000132 }
133
James E. King, IIIdf899132016-11-12 15:16:30 -0500134 std::cout << "\t\t\t\tloaded " << count << " tasks to execute" << std::endl;
135
Mark Sleef5f2be42006-09-05 21:05:31 +0000136 {
137 Synchronized s(monitor);
138
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100139 while (activeCount > 0) {
James E. King, IIIdf899132016-11-12 15:16:30 -0500140 std::cout << "\t\t\t\tactiveCount = " << activeCount << std::endl;
David Reiss96d23882007-07-26 21:10:32 +0000141 monitor.wait();
Marc Slemko740343d2006-07-20 00:31:02 +0000142 }
143 }
144
cyybfdbd032019-01-12 14:38:28 +0800145 int64_t time01 = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
Marc Slemko740343d2006-07-20 00:31:02 +0000146
Mark Slee9b82d272007-05-23 05:16:07 +0000147 int64_t firstTime = 9223372036854775807LL;
148 int64_t lastTime = 0;
Marc Slemkoc7782972006-07-25 02:26:35 +0000149
150 double averageTime = 0;
Mark Slee9b82d272007-05-23 05:16:07 +0000151 int64_t minTime = 9223372036854775807LL;
152 int64_t maxTime = 0;
Marc Slemkoc7782972006-07-25 02:26:35 +0000153
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100154 for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin();
155 ix != tasks.end();
156 ix++) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000157
Marc Slemko6f038a72006-08-03 18:58:09 +0000158 shared_ptr<ThreadManagerTests::Task> task = *ix;
Marc Slemkoc7782972006-07-25 02:26:35 +0000159
Mark Slee9b82d272007-05-23 05:16:07 +0000160 int64_t delta = task->_endTime - task->_startTime;
Marc Slemkoc7782972006-07-25 02:26:35 +0000161
162 assert(delta > 0);
163
Mark Sleef5f2be42006-09-05 21:05:31 +0000164 if (task->_startTime < firstTime) {
David Reiss96d23882007-07-26 21:10:32 +0000165 firstTime = task->_startTime;
Marc Slemkoc7782972006-07-25 02:26:35 +0000166 }
167
Mark Sleef5f2be42006-09-05 21:05:31 +0000168 if (task->_endTime > lastTime) {
David Reiss96d23882007-07-26 21:10:32 +0000169 lastTime = task->_endTime;
Marc Slemkoc7782972006-07-25 02:26:35 +0000170 }
171
Mark Sleef5f2be42006-09-05 21:05:31 +0000172 if (delta < minTime) {
David Reiss96d23882007-07-26 21:10:32 +0000173 minTime = delta;
Marc Slemkoc7782972006-07-25 02:26:35 +0000174 }
175
Mark Sleef5f2be42006-09-05 21:05:31 +0000176 if (delta > maxTime) {
David Reiss96d23882007-07-26 21:10:32 +0000177 maxTime = delta;
Marc Slemkoc7782972006-07-25 02:26:35 +0000178 }
179
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100180 averageTime += delta;
Marc Slemko740343d2006-07-20 00:31:02 +0000181 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000182
Marc Slemkoc7782972006-07-25 02:26:35 +0000183 averageTime /= count;
184
James E. King, IIIdf899132016-11-12 15:16:30 -0500185 std::cout << "\t\t\tfirst start: " << firstTime << " Last end: " << lastTime
186 << " min: " << minTime << "ms max: " << maxTime << "ms average: " << averageTime
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100187 << "ms" << std::endl;
Marc Slemko740343d2006-07-20 00:31:02 +0000188
James E. King, IIIdf899132016-11-12 15:16:30 -0500189 bool success = (time01 - time00) >= ((int64_t)count * timeout) / (int64_t)workerCount;
Marc Slemkoc7782972006-07-25 02:26:35 +0000190
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100191 std::cout << "\t\t\t" << (success ? "Success" : "Failure")
James E. King, IIIdf899132016-11-12 15:16:30 -0500192 << "! expected time: " << ((int64_t)count * timeout) / (int64_t)workerCount << "ms elapsed time: " << time01 - time00
193 << "ms" << std::endl;
Marc Slemko740343d2006-07-20 00:31:02 +0000194
Marc Slemkoc7782972006-07-25 02:26:35 +0000195 return success;
Marc Slemko740343d2006-07-20 00:31:02 +0000196 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000197
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100198 class BlockTask : public Runnable {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000199
200 public:
James E. King, IIIdf899132016-11-12 15:16:30 -0500201 BlockTask(Monitor& entryMonitor, Monitor& blockMonitor, bool& blocked, Monitor& doneMonitor, size_t& count)
202 : _entryMonitor(entryMonitor), _entered(false), _blockMonitor(blockMonitor), _blocked(blocked), _doneMonitor(doneMonitor), _count(count) {}
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000203
204 void run() {
205 {
James E. King, IIIdf899132016-11-12 15:16:30 -0500206 Synchronized s(_entryMonitor);
207 _entered = true;
208 _entryMonitor.notify();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000209 }
210
211 {
James E. King, IIIdf899132016-11-12 15:16:30 -0500212 Synchronized s(_blockMonitor);
213 while (_blocked) {
214 _blockMonitor.wait();
215 }
216 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000217
James E. King, IIIdf899132016-11-12 15:16:30 -0500218 {
219 Synchronized s(_doneMonitor);
220 if (--_count == 0) {
221 _doneMonitor.notify();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000222 }
223 }
224 }
225
James E. King, IIIdf899132016-11-12 15:16:30 -0500226 Monitor& _entryMonitor;
227 bool _entered;
228 Monitor& _blockMonitor;
229 bool& _blocked;
230 Monitor& _doneMonitor;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000231 size_t& _count;
232 };
233
234 /**
235 * Block test. Create pendingTaskCountMax tasks. Verify that we block adding the
236 * pendingTaskCountMax + 1th task. Verify that we unblock when a task completes */
237
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100238 bool blockTest(int64_t timeout = 100LL, size_t workerCount = 2) {
239 (void)timeout;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000240 bool success = false;
241
242 try {
243
James E. King, IIIdf899132016-11-12 15:16:30 -0500244 Monitor entryMonitor; // not used by this test
245 Monitor blockMonitor;
246 bool blocked[] = {true, true, true};
247 Monitor doneMonitor;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000248
249 size_t pendingTaskMaxCount = workerCount;
250
251 size_t activeCounts[] = {workerCount, pendingTaskMaxCount, 1};
252
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100253 shared_ptr<ThreadManager> threadManager
254 = ThreadManager::newSimpleThreadManager(workerCount, pendingTaskMaxCount);
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000255
cyyca8af9b2019-01-11 22:13:12 +0800256 shared_ptr<ThreadFactory> threadFactory
257 = shared_ptr<ThreadFactory>(new ThreadFactory());
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000258
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000259 threadManager->threadFactory(threadFactory);
260
261 threadManager->start();
262
James E. King, IIIdf899132016-11-12 15:16:30 -0500263 std::vector<shared_ptr<ThreadManagerTests::BlockTask> > tasks;
264 tasks.reserve(workerCount + pendingTaskMaxCount);
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000265
266 for (size_t ix = 0; ix < workerCount; ix++) {
267
James E. King, IIIdf899132016-11-12 15:16:30 -0500268 tasks.push_back(shared_ptr<ThreadManagerTests::BlockTask>(
269 new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked[0], doneMonitor, activeCounts[0])));
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000270 }
271
272 for (size_t ix = 0; ix < pendingTaskMaxCount; ix++) {
273
James E. King, IIIdf899132016-11-12 15:16:30 -0500274 tasks.push_back(shared_ptr<ThreadManagerTests::BlockTask>(
275 new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked[1], doneMonitor, activeCounts[1])));
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000276 }
277
James E. King, IIIdf899132016-11-12 15:16:30 -0500278 for (std::vector<shared_ptr<ThreadManagerTests::BlockTask> >::iterator ix = tasks.begin();
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100279 ix != tasks.end();
280 ix++) {
David Reiss96d23882007-07-26 21:10:32 +0000281 threadManager->add(*ix);
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000282 }
283
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100284 if (!(success = (threadManager->totalTaskCount() == pendingTaskMaxCount + workerCount))) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000285 throw TException("Unexpected pending task count");
286 }
287
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100288 shared_ptr<ThreadManagerTests::BlockTask> extraTask(
James E. King, IIIdf899132016-11-12 15:16:30 -0500289 new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked[2], doneMonitor, activeCounts[2]));
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000290
291 try {
292 threadManager->add(extraTask, 1);
293 throw TException("Unexpected success adding task in excess of pending task count");
ben-craigfae08e72015-07-15 11:34:47 -0500294 } catch (TooManyPendingTasksException&) {
David Reiss9fcacc82009-06-04 00:32:54 +0000295 throw TException("Should have timed out adding task in excess of pending task count");
ben-craigfae08e72015-07-15 11:34:47 -0500296 } catch (TimedOutException&) {
David Reiss9fcacc82009-06-04 00:32:54 +0000297 // Expected result
298 }
299
300 try {
301 threadManager->add(extraTask, -1);
302 throw TException("Unexpected success adding task in excess of pending task count");
ben-craigfae08e72015-07-15 11:34:47 -0500303 } catch (TimedOutException&) {
David Reiss9fcacc82009-06-04 00:32:54 +0000304 throw TException("Unexpected timeout adding task in excess of pending task count");
ben-craigfae08e72015-07-15 11:34:47 -0500305 } catch (TooManyPendingTasksException&) {
David Reiss9fcacc82009-06-04 00:32:54 +0000306 // Expected result
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000307 }
308
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100309 std::cout << "\t\t\t"
310 << "Pending tasks " << threadManager->pendingTaskCount() << std::endl;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000311
312 {
James E. King, IIIdf899132016-11-12 15:16:30 -0500313 Synchronized s(blockMonitor);
314 blocked[0] = false;
315 blockMonitor.notifyAll();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000316 }
317
318 {
James E. King, IIIdf899132016-11-12 15:16:30 -0500319 Synchronized s(doneMonitor);
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100320 while (activeCounts[0] != 0) {
James E. King, IIIdf899132016-11-12 15:16:30 -0500321 doneMonitor.wait();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000322 }
323 }
324
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100325 std::cout << "\t\t\t"
326 << "Pending tasks " << threadManager->pendingTaskCount() << std::endl;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000327
328 try {
329 threadManager->add(extraTask, 1);
ben-craigfae08e72015-07-15 11:34:47 -0500330 } catch (TimedOutException&) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100331 std::cout << "\t\t\t"
332 << "add timed out unexpectedly" << std::endl;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000333 throw TException("Unexpected timeout adding task");
334
ben-craigfae08e72015-07-15 11:34:47 -0500335 } catch (TooManyPendingTasksException&) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100336 std::cout << "\t\t\t"
337 << "add encountered too many pending exepctions" << std::endl;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000338 throw TException("Unexpected timeout adding task");
339 }
340
341 // Wake up tasks that were pending before and wait for them to complete
342
343 {
James E. King, IIIdf899132016-11-12 15:16:30 -0500344 Synchronized s(blockMonitor);
345 blocked[1] = false;
346 blockMonitor.notifyAll();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000347 }
348
349 {
James E. King, IIIdf899132016-11-12 15:16:30 -0500350 Synchronized s(doneMonitor);
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100351 while (activeCounts[1] != 0) {
James E. King, IIIdf899132016-11-12 15:16:30 -0500352 doneMonitor.wait();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000353 }
354 }
355
356 // Wake up the extra task and wait for it to complete
357
358 {
James E. King, IIIdf899132016-11-12 15:16:30 -0500359 Synchronized s(blockMonitor);
360 blocked[2] = false;
361 blockMonitor.notifyAll();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000362 }
363
364 {
James E. King, IIIdf899132016-11-12 15:16:30 -0500365 Synchronized s(doneMonitor);
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100366 while (activeCounts[2] != 0) {
James E. King, IIIdf899132016-11-12 15:16:30 -0500367 doneMonitor.wait();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000368 }
369 }
370
James E. King, IIIdf899132016-11-12 15:16:30 -0500371 threadManager->stop();
372
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100373 if (!(success = (threadManager->totalTaskCount() == 0))) {
James E. King, IIIdf899132016-11-12 15:16:30 -0500374 throw TException("Unexpected total task count");
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000375 }
376
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100377 } catch (TException& e) {
David Reiss9fcacc82009-06-04 00:32:54 +0000378 std::cout << "ERROR: " << e.what() << std::endl;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000379 }
380
381 std::cout << "\t\t\t" << (success ? "Success" : "Failure") << std::endl;
382 return success;
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100383 }
James E. King, IIIdf899132016-11-12 15:16:30 -0500384
385
386 bool apiTest() {
387
388 // prove currentTime has milliseconds granularity since many other things depend on it
cyybfdbd032019-01-12 14:38:28 +0800389 int64_t a = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
James E. King, IIIdf899132016-11-12 15:16:30 -0500390 sleep_(100);
cyybfdbd032019-01-12 14:38:28 +0800391 int64_t b = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
James E. King, IIIdf899132016-11-12 15:16:30 -0500392 if (b - a < 50 || b - a > 150) {
393 std::cerr << "\t\t\texpected 100ms gap, found " << (b-a) << "ms gap instead." << std::endl;
394 return false;
395 }
396
cyyca8af9b2019-01-11 22:13:12 +0800397 return apiTestWithThreadFactory(shared_ptr<ThreadFactory>(new ThreadFactory()));
James E. King, IIIdf899132016-11-12 15:16:30 -0500398
399 }
400
cyyca8af9b2019-01-11 22:13:12 +0800401 bool apiTestWithThreadFactory(shared_ptr<ThreadFactory> threadFactory)
James E. King, IIIdf899132016-11-12 15:16:30 -0500402 {
403 shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(1);
404 threadManager->threadFactory(threadFactory);
405
James E. King, IIIdf899132016-11-12 15:16:30 -0500406 std::cout << "\t\t\t\tstarting.. " << std::endl;
407
408 threadManager->start();
cyy316723a2019-01-05 16:35:14 +0800409 threadManager->setExpireCallback(expiredNotifier); // std::bind(&ThreadManagerTests::expiredNotifier, this));
James E. King, IIIdf899132016-11-12 15:16:30 -0500410
411#define EXPECT(FUNC, COUNT) { size_t c = FUNC; if (c != COUNT) { std::cerr << "expected " #FUNC" to be " #COUNT ", but was " << c << std::endl; return false; } }
412
413 EXPECT(threadManager->workerCount(), 1);
414 EXPECT(threadManager->idleWorkerCount(), 1);
415 EXPECT(threadManager->pendingTaskCount(), 0);
416
417 std::cout << "\t\t\t\tadd 2nd worker.. " << std::endl;
418
419 threadManager->addWorker();
420
421 EXPECT(threadManager->workerCount(), 2);
422 EXPECT(threadManager->idleWorkerCount(), 2);
423 EXPECT(threadManager->pendingTaskCount(), 0);
424
425 std::cout << "\t\t\t\tremove 2nd worker.. " << std::endl;
426
427 threadManager->removeWorker();
428
429 EXPECT(threadManager->workerCount(), 1);
430 EXPECT(threadManager->idleWorkerCount(), 1);
431 EXPECT(threadManager->pendingTaskCount(), 0);
432
433 std::cout << "\t\t\t\tremove 1st worker.. " << std::endl;
434
435 threadManager->removeWorker();
436
437 EXPECT(threadManager->workerCount(), 0);
438 EXPECT(threadManager->idleWorkerCount(), 0);
439 EXPECT(threadManager->pendingTaskCount(), 0);
440
441 std::cout << "\t\t\t\tadd blocking task.. " << std::endl;
442
443 // We're going to throw a blocking task into the mix
444 Monitor entryMonitor; // signaled when task is running
445 Monitor blockMonitor; // to be signaled to unblock the task
446 bool blocked(true); // set to false before notifying
447 Monitor doneMonitor; // signaled when count reaches zero
448 size_t activeCount = 1;
449 shared_ptr<ThreadManagerTests::BlockTask> blockingTask(
450 new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked, doneMonitor, activeCount));
451 threadManager->add(blockingTask);
452
453 EXPECT(threadManager->workerCount(), 0);
454 EXPECT(threadManager->idleWorkerCount(), 0);
455 EXPECT(threadManager->pendingTaskCount(), 1);
456
457 std::cout << "\t\t\t\tadd other task.. " << std::endl;
458
459 shared_ptr<ThreadManagerTests::Task> otherTask(
460 new ThreadManagerTests::Task(doneMonitor, activeCount, 0));
461
462 threadManager->add(otherTask);
463
464 EXPECT(threadManager->workerCount(), 0);
465 EXPECT(threadManager->idleWorkerCount(), 0);
466 EXPECT(threadManager->pendingTaskCount(), 2);
467
468 std::cout << "\t\t\t\tremove blocking task specifically.. " << std::endl;
469
470 threadManager->remove(blockingTask);
471
472 EXPECT(threadManager->workerCount(), 0);
473 EXPECT(threadManager->idleWorkerCount(), 0);
474 EXPECT(threadManager->pendingTaskCount(), 1);
475
476 std::cout << "\t\t\t\tremove next pending task.." << std::endl;
477
478 shared_ptr<Runnable> nextTask = threadManager->removeNextPending();
479 if (nextTask != otherTask) {
480 std::cerr << "\t\t\t\t\texpected removeNextPending to return otherTask" << std::endl;
481 return false;
482 }
483
484 EXPECT(threadManager->workerCount(), 0);
485 EXPECT(threadManager->idleWorkerCount(), 0);
486 EXPECT(threadManager->pendingTaskCount(), 0);
487
488 std::cout << "\t\t\t\tremove next pending task (none left).." << std::endl;
489
490 nextTask = threadManager->removeNextPending();
491 if (nextTask) {
492 std::cerr << "\t\t\t\t\texpected removeNextPending to return an empty Runnable" << std::endl;
493 return false;
494 }
495
496 std::cout << "\t\t\t\tadd 2 expired tasks and 1 not.." << std::endl;
497
498 shared_ptr<ThreadManagerTests::Task> expiredTask(
499 new ThreadManagerTests::Task(doneMonitor, activeCount, 0));
500
501 threadManager->add(expiredTask, 0, 1);
502 threadManager->add(blockingTask); // add one that hasn't expired to make sure it gets skipped
503 threadManager->add(expiredTask, 0, 1); // add a second expired to ensure removeExpiredTasks removes both
504
505 sleep_(50); // make sure enough time elapses for it to expire - the shortest expiration time is 1 millisecond
506
507 EXPECT(threadManager->workerCount(), 0);
508 EXPECT(threadManager->idleWorkerCount(), 0);
509 EXPECT(threadManager->pendingTaskCount(), 3);
510 EXPECT(threadManager->expiredTaskCount(), 0);
511
512 std::cout << "\t\t\t\tremove expired tasks.." << std::endl;
513
514 if (!m_expired.empty()) {
515 std::cerr << "\t\t\t\t\texpected m_expired to be empty" << std::endl;
516 return false;
517 }
518
519 threadManager->removeExpiredTasks();
520
521 if (m_expired.size() != 2) {
522 std::cerr << "\t\t\t\t\texpected m_expired to be set" << std::endl;
523 return false;
524 }
525
526 if (m_expired.front() != expiredTask) {
527 std::cerr << "\t\t\t\t\texpected m_expired[0] to be the expired task" << std::endl;
528 return false;
529 }
530 m_expired.pop_front();
531
532 if (m_expired.front() != expiredTask) {
533 std::cerr << "\t\t\t\t\texpected m_expired[1] to be the expired task" << std::endl;
534 return false;
535 }
536
537 m_expired.clear();
538
539 threadManager->remove(blockingTask);
540
541 EXPECT(threadManager->workerCount(), 0);
542 EXPECT(threadManager->idleWorkerCount(), 0);
543 EXPECT(threadManager->pendingTaskCount(), 0);
544 EXPECT(threadManager->expiredTaskCount(), 2);
545
546 std::cout << "\t\t\t\tadd expired task (again).." << std::endl;
547
548 threadManager->add(expiredTask, 0, 1); // expires in 1ms
549 sleep_(50); // make sure enough time elapses for it to expire - the shortest expiration time is 1ms
550
551 std::cout << "\t\t\t\tadd worker to consume expired task.." << std::endl;
552
553 threadManager->addWorker();
554 sleep_(100); // make sure it has time to spin up and expire the task
555
556 if (m_expired.empty()) {
557 std::cerr << "\t\t\t\t\texpected m_expired to be set" << std::endl;
558 return false;
559 }
560
561 if (m_expired.front() != expiredTask) {
562 std::cerr << "\t\t\t\t\texpected m_expired to be the expired task" << std::endl;
563 return false;
564 }
565
566 m_expired.clear();
567
568 EXPECT(threadManager->workerCount(), 1);
569 EXPECT(threadManager->idleWorkerCount(), 1);
570 EXPECT(threadManager->pendingTaskCount(), 0);
571 EXPECT(threadManager->expiredTaskCount(), 3);
572
573 std::cout << "\t\t\t\ttry to remove too many workers" << std::endl;
574 try {
575 threadManager->removeWorker(2);
576 std::cerr << "\t\t\t\t\texpected InvalidArgumentException" << std::endl;
577 return false;
578 } catch (const InvalidArgumentException&) {
579 /* expected */
580 }
581
582 std::cout << "\t\t\t\tremove worker.. " << std::endl;
583
584 threadManager->removeWorker();
585
586 EXPECT(threadManager->workerCount(), 0);
587 EXPECT(threadManager->idleWorkerCount(), 0);
588 EXPECT(threadManager->pendingTaskCount(), 0);
589 EXPECT(threadManager->expiredTaskCount(), 3);
590
591 std::cout << "\t\t\t\tadd blocking task.. " << std::endl;
592
593 threadManager->add(blockingTask);
594
595 EXPECT(threadManager->workerCount(), 0);
596 EXPECT(threadManager->idleWorkerCount(), 0);
597 EXPECT(threadManager->pendingTaskCount(), 1);
598
599 std::cout << "\t\t\t\tadd worker.. " << std::endl;
600
601 threadManager->addWorker();
602 {
603 Synchronized s(entryMonitor);
604 while (!blockingTask->_entered) {
605 entryMonitor.wait();
606 }
607 }
608
609 EXPECT(threadManager->workerCount(), 1);
610 EXPECT(threadManager->idleWorkerCount(), 0);
611 EXPECT(threadManager->pendingTaskCount(), 0);
612
613 std::cout << "\t\t\t\tunblock task and remove worker.. " << std::endl;
614
615 {
616 Synchronized s(blockMonitor);
617 blocked = false;
618 blockMonitor.notifyAll();
619 }
620 threadManager->removeWorker();
621
622 EXPECT(threadManager->workerCount(), 0);
623 EXPECT(threadManager->idleWorkerCount(), 0);
624 EXPECT(threadManager->pendingTaskCount(), 0);
625
626 std::cout << "\t\t\t\tcleanup.. " << std::endl;
627
628 blockingTask.reset();
629 threadManager.reset();
630 return true;
631 }
Marc Slemko740343d2006-07-20 00:31:02 +0000632};
Marc Slemko6f038a72006-08-03 18:58:09 +0000633
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100634}
635}
636}
637} // apache::thrift::concurrency
Marc Slemko740343d2006-07-20 00:31:02 +0000638
T Jake Lucianib5e62212009-01-31 22:36:20 +0000639using namespace apache::thrift::concurrency::test;