blob: b3a319a57104f8147848852c107cd62a79e5567e [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Roger Meierba406d32013-07-15 22:41:34 +020020#include <thrift/thrift-config.h>
Roger Meier49ff8b12012-04-13 09:12:31 +000021#include <thrift/concurrency/ThreadManager.h>
cyyca8af9b2019-01-11 22:13:12 +080022#include <thrift/concurrency/ThreadFactory.h>
Roger Meier49ff8b12012-04-13 09:12:31 +000023#include <thrift/concurrency/Monitor.h>
24#include <thrift/concurrency/Util.h>
Marc Slemko740343d2006-07-20 00:31:02 +000025
26#include <assert.h>
James E. King, IIIdf899132016-11-12 15:16:30 -050027#include <deque>
Marc Slemko740343d2006-07-20 00:31:02 +000028#include <set>
29#include <iostream>
Marc Slemkoc7782972006-07-25 02:26:35 +000030#include <stdint.h>
Marc Slemko740343d2006-07-20 00:31:02 +000031
Konrad Grochowski16a23a62014-11-13 15:33:38 +010032namespace apache {
33namespace thrift {
34namespace concurrency {
35namespace test {
Marc Slemko740343d2006-07-20 00:31:02 +000036
T Jake Lucianib5e62212009-01-31 22:36:20 +000037using namespace apache::thrift::concurrency;
Marc Slemko740343d2006-07-20 00:31:02 +000038
cyy316723a2019-01-05 16:35:14 +080039static std::deque<std::shared_ptr<Runnable> > m_expired;
40static void expiredNotifier(std::shared_ptr<Runnable> runnable)
James E. King, IIIdf899132016-11-12 15:16:30 -050041{
42 m_expired.push_back(runnable);
43}
Marc Slemko740343d2006-07-20 00:31:02 +000044
James E. King, IIIdf899132016-11-12 15:16:30 -050045static void sleep_(int64_t millisec) {
46 Monitor _sleep;
47 Synchronized s(_sleep);
48
49 try {
50 _sleep.wait(millisec);
51 } catch (TimedOutException&) {
52 ;
53 } catch (...) {
54 assert(0);
55 }
56}
57
58class ThreadManagerTests {
Konrad Grochowski293a40e2014-09-04 17:28:17 +040059
Marc Slemko740343d2006-07-20 00:31:02 +000060public:
Konrad Grochowski16a23a62014-11-13 15:33:38 +010061 class Task : public Runnable {
Marc Slemko740343d2006-07-20 00:31:02 +000062
63 public:
Konrad Grochowski16a23a62014-11-13 15:33:38 +010064 Task(Monitor& monitor, size_t& count, int64_t timeout)
James E. King, III36200902016-10-05 14:47:18 -040065 : _monitor(monitor), _count(count), _timeout(timeout), _startTime(0), _endTime(0), _done(false) {}
Marc Slemko740343d2006-07-20 00:31:02 +000066
67 void run() {
68
Marc Slemkoc7782972006-07-25 02:26:35 +000069 _startTime = Util::currentTime();
Marc Slemko740343d2006-07-20 00:31:02 +000070
James E. King, IIIdf899132016-11-12 15:16:30 -050071 sleep_(_timeout);
Marc Slemko740343d2006-07-20 00:31:02 +000072
Marc Slemkoc7782972006-07-25 02:26:35 +000073 _endTime = Util::currentTime();
74
Marc Slemko740343d2006-07-20 00:31:02 +000075 _done = true;
Marc Slemko3a3b53b2007-05-22 23:59:54 +000076
Mark Sleef5f2be42006-09-05 21:05:31 +000077 {
78 Synchronized s(_monitor);
Marc Slemko740343d2006-07-20 00:31:02 +000079
David Reiss96d23882007-07-26 21:10:32 +000080 // std::cout << "Thread " << _count << " completed " << std::endl;
Marc Slemko3a3b53b2007-05-22 23:59:54 +000081
David Reiss96d23882007-07-26 21:10:32 +000082 _count--;
James E. King, IIIdf899132016-11-12 15:16:30 -050083 if (_count % 10000 == 0) {
David Reiss96d23882007-07-26 21:10:32 +000084 _monitor.notify();
85 }
Marc Slemko740343d2006-07-20 00:31:02 +000086 }
87 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +000088
Marc Slemko740343d2006-07-20 00:31:02 +000089 Monitor& _monitor;
90 size_t& _count;
Mark Slee9b82d272007-05-23 05:16:07 +000091 int64_t _timeout;
92 int64_t _startTime;
93 int64_t _endTime;
Marc Slemko740343d2006-07-20 00:31:02 +000094 bool _done;
Marc Slemkoc7782972006-07-25 02:26:35 +000095 Monitor _sleep;
Marc Slemko740343d2006-07-20 00:31:02 +000096 };
97
Mark Sleef5f2be42006-09-05 21:05:31 +000098 /**
99 * Dispatch count tasks, each of which blocks for timeout milliseconds then
100 * completes. Verify that all tasks completed and that thread manager cleans
101 * up properly on delete.
102 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100103 bool loadTest(size_t count = 100, int64_t timeout = 100LL, size_t workerCount = 4) {
Marc Slemko740343d2006-07-20 00:31:02 +0000104
105 Monitor monitor;
106
107 size_t activeCount = count;
108
Marc Slemko6f038a72006-08-03 18:58:09 +0000109 shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount);
Marc Slemkoc7782972006-07-25 02:26:35 +0000110
cyyca8af9b2019-01-11 22:13:12 +0800111 shared_ptr<ThreadFactory> threadFactory
112 = shared_ptr<ThreadFactory>(new ThreadFactory(false));
Marc Slemkoc7782972006-07-25 02:26:35 +0000113
Marc Slemkoc7782972006-07-25 02:26:35 +0000114 threadManager->threadFactory(threadFactory);
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000115
116 threadManager->start();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000117
Marc Slemko6f038a72006-08-03 18:58:09 +0000118 std::set<shared_ptr<ThreadManagerTests::Task> > tasks;
Marc Slemko740343d2006-07-20 00:31:02 +0000119
Mark Sleef5f2be42006-09-05 21:05:31 +0000120 for (size_t ix = 0; ix < count; ix++) {
Marc Slemko740343d2006-07-20 00:31:02 +0000121
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100122 tasks.insert(shared_ptr<ThreadManagerTests::Task>(
123 new ThreadManagerTests::Task(monitor, activeCount, timeout)));
Marc Slemko740343d2006-07-20 00:31:02 +0000124 }
125
Mark Slee9b82d272007-05-23 05:16:07 +0000126 int64_t time00 = Util::currentTime();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000127
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100128 for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin();
129 ix != tasks.end();
130 ix++) {
Marc Slemko740343d2006-07-20 00:31:02 +0000131
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100132 threadManager->add(*ix);
Marc Slemko740343d2006-07-20 00:31:02 +0000133 }
134
James E. King, IIIdf899132016-11-12 15:16:30 -0500135 std::cout << "\t\t\t\tloaded " << count << " tasks to execute" << std::endl;
136
Mark Sleef5f2be42006-09-05 21:05:31 +0000137 {
138 Synchronized s(monitor);
139
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100140 while (activeCount > 0) {
James E. King, IIIdf899132016-11-12 15:16:30 -0500141 std::cout << "\t\t\t\tactiveCount = " << activeCount << std::endl;
David Reiss96d23882007-07-26 21:10:32 +0000142 monitor.wait();
Marc Slemko740343d2006-07-20 00:31:02 +0000143 }
144 }
145
Mark Slee9b82d272007-05-23 05:16:07 +0000146 int64_t time01 = Util::currentTime();
Marc Slemko740343d2006-07-20 00:31:02 +0000147
Mark Slee9b82d272007-05-23 05:16:07 +0000148 int64_t firstTime = 9223372036854775807LL;
149 int64_t lastTime = 0;
Marc Slemkoc7782972006-07-25 02:26:35 +0000150
151 double averageTime = 0;
Mark Slee9b82d272007-05-23 05:16:07 +0000152 int64_t minTime = 9223372036854775807LL;
153 int64_t maxTime = 0;
Marc Slemkoc7782972006-07-25 02:26:35 +0000154
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100155 for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin();
156 ix != tasks.end();
157 ix++) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000158
Marc Slemko6f038a72006-08-03 18:58:09 +0000159 shared_ptr<ThreadManagerTests::Task> task = *ix;
Marc Slemkoc7782972006-07-25 02:26:35 +0000160
Mark Slee9b82d272007-05-23 05:16:07 +0000161 int64_t delta = task->_endTime - task->_startTime;
Marc Slemkoc7782972006-07-25 02:26:35 +0000162
163 assert(delta > 0);
164
Mark Sleef5f2be42006-09-05 21:05:31 +0000165 if (task->_startTime < firstTime) {
David Reiss96d23882007-07-26 21:10:32 +0000166 firstTime = task->_startTime;
Marc Slemkoc7782972006-07-25 02:26:35 +0000167 }
168
Mark Sleef5f2be42006-09-05 21:05:31 +0000169 if (task->_endTime > lastTime) {
David Reiss96d23882007-07-26 21:10:32 +0000170 lastTime = task->_endTime;
Marc Slemkoc7782972006-07-25 02:26:35 +0000171 }
172
Mark Sleef5f2be42006-09-05 21:05:31 +0000173 if (delta < minTime) {
David Reiss96d23882007-07-26 21:10:32 +0000174 minTime = delta;
Marc Slemkoc7782972006-07-25 02:26:35 +0000175 }
176
Mark Sleef5f2be42006-09-05 21:05:31 +0000177 if (delta > maxTime) {
David Reiss96d23882007-07-26 21:10:32 +0000178 maxTime = delta;
Marc Slemkoc7782972006-07-25 02:26:35 +0000179 }
180
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100181 averageTime += delta;
Marc Slemko740343d2006-07-20 00:31:02 +0000182 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000183
Marc Slemkoc7782972006-07-25 02:26:35 +0000184 averageTime /= count;
185
James E. King, IIIdf899132016-11-12 15:16:30 -0500186 std::cout << "\t\t\tfirst start: " << firstTime << " Last end: " << lastTime
187 << " min: " << minTime << "ms max: " << maxTime << "ms average: " << averageTime
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100188 << "ms" << std::endl;
Marc Slemko740343d2006-07-20 00:31:02 +0000189
James E. King, IIIdf899132016-11-12 15:16:30 -0500190 bool success = (time01 - time00) >= ((int64_t)count * timeout) / (int64_t)workerCount;
Marc Slemkoc7782972006-07-25 02:26:35 +0000191
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100192 std::cout << "\t\t\t" << (success ? "Success" : "Failure")
James E. King, IIIdf899132016-11-12 15:16:30 -0500193 << "! expected time: " << ((int64_t)count * timeout) / (int64_t)workerCount << "ms elapsed time: " << time01 - time00
194 << "ms" << std::endl;
Marc Slemko740343d2006-07-20 00:31:02 +0000195
Marc Slemkoc7782972006-07-25 02:26:35 +0000196 return success;
Marc Slemko740343d2006-07-20 00:31:02 +0000197 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000198
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100199 class BlockTask : public Runnable {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000200
201 public:
James E. King, IIIdf899132016-11-12 15:16:30 -0500202 BlockTask(Monitor& entryMonitor, Monitor& blockMonitor, bool& blocked, Monitor& doneMonitor, size_t& count)
203 : _entryMonitor(entryMonitor), _entered(false), _blockMonitor(blockMonitor), _blocked(blocked), _doneMonitor(doneMonitor), _count(count) {}
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000204
205 void run() {
206 {
James E. King, IIIdf899132016-11-12 15:16:30 -0500207 Synchronized s(_entryMonitor);
208 _entered = true;
209 _entryMonitor.notify();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000210 }
211
212 {
James E. King, IIIdf899132016-11-12 15:16:30 -0500213 Synchronized s(_blockMonitor);
214 while (_blocked) {
215 _blockMonitor.wait();
216 }
217 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000218
James E. King, IIIdf899132016-11-12 15:16:30 -0500219 {
220 Synchronized s(_doneMonitor);
221 if (--_count == 0) {
222 _doneMonitor.notify();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000223 }
224 }
225 }
226
James E. King, IIIdf899132016-11-12 15:16:30 -0500227 Monitor& _entryMonitor;
228 bool _entered;
229 Monitor& _blockMonitor;
230 bool& _blocked;
231 Monitor& _doneMonitor;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000232 size_t& _count;
233 };
234
235 /**
236 * Block test. Create pendingTaskCountMax tasks. Verify that we block adding the
237 * pendingTaskCountMax + 1th task. Verify that we unblock when a task completes */
238
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100239 bool blockTest(int64_t timeout = 100LL, size_t workerCount = 2) {
240 (void)timeout;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000241 bool success = false;
242
243 try {
244
James E. King, IIIdf899132016-11-12 15:16:30 -0500245 Monitor entryMonitor; // not used by this test
246 Monitor blockMonitor;
247 bool blocked[] = {true, true, true};
248 Monitor doneMonitor;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000249
250 size_t pendingTaskMaxCount = workerCount;
251
252 size_t activeCounts[] = {workerCount, pendingTaskMaxCount, 1};
253
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100254 shared_ptr<ThreadManager> threadManager
255 = ThreadManager::newSimpleThreadManager(workerCount, pendingTaskMaxCount);
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000256
cyyca8af9b2019-01-11 22:13:12 +0800257 shared_ptr<ThreadFactory> threadFactory
258 = shared_ptr<ThreadFactory>(new ThreadFactory());
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000259
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000260 threadManager->threadFactory(threadFactory);
261
262 threadManager->start();
263
James E. King, IIIdf899132016-11-12 15:16:30 -0500264 std::vector<shared_ptr<ThreadManagerTests::BlockTask> > tasks;
265 tasks.reserve(workerCount + pendingTaskMaxCount);
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000266
267 for (size_t ix = 0; ix < workerCount; ix++) {
268
James E. King, IIIdf899132016-11-12 15:16:30 -0500269 tasks.push_back(shared_ptr<ThreadManagerTests::BlockTask>(
270 new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked[0], doneMonitor, activeCounts[0])));
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000271 }
272
273 for (size_t ix = 0; ix < pendingTaskMaxCount; ix++) {
274
James E. King, IIIdf899132016-11-12 15:16:30 -0500275 tasks.push_back(shared_ptr<ThreadManagerTests::BlockTask>(
276 new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked[1], doneMonitor, activeCounts[1])));
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000277 }
278
James E. King, IIIdf899132016-11-12 15:16:30 -0500279 for (std::vector<shared_ptr<ThreadManagerTests::BlockTask> >::iterator ix = tasks.begin();
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100280 ix != tasks.end();
281 ix++) {
David Reiss96d23882007-07-26 21:10:32 +0000282 threadManager->add(*ix);
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000283 }
284
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100285 if (!(success = (threadManager->totalTaskCount() == pendingTaskMaxCount + workerCount))) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000286 throw TException("Unexpected pending task count");
287 }
288
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100289 shared_ptr<ThreadManagerTests::BlockTask> extraTask(
James E. King, IIIdf899132016-11-12 15:16:30 -0500290 new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked[2], doneMonitor, activeCounts[2]));
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000291
292 try {
293 threadManager->add(extraTask, 1);
294 throw TException("Unexpected success adding task in excess of pending task count");
ben-craigfae08e72015-07-15 11:34:47 -0500295 } catch (TooManyPendingTasksException&) {
David Reiss9fcacc82009-06-04 00:32:54 +0000296 throw TException("Should have timed out adding task in excess of pending task count");
ben-craigfae08e72015-07-15 11:34:47 -0500297 } catch (TimedOutException&) {
David Reiss9fcacc82009-06-04 00:32:54 +0000298 // Expected result
299 }
300
301 try {
302 threadManager->add(extraTask, -1);
303 throw TException("Unexpected success adding task in excess of pending task count");
ben-craigfae08e72015-07-15 11:34:47 -0500304 } catch (TimedOutException&) {
David Reiss9fcacc82009-06-04 00:32:54 +0000305 throw TException("Unexpected timeout adding task in excess of pending task count");
ben-craigfae08e72015-07-15 11:34:47 -0500306 } catch (TooManyPendingTasksException&) {
David Reiss9fcacc82009-06-04 00:32:54 +0000307 // Expected result
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000308 }
309
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100310 std::cout << "\t\t\t"
311 << "Pending tasks " << threadManager->pendingTaskCount() << std::endl;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000312
313 {
James E. King, IIIdf899132016-11-12 15:16:30 -0500314 Synchronized s(blockMonitor);
315 blocked[0] = false;
316 blockMonitor.notifyAll();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000317 }
318
319 {
James E. King, IIIdf899132016-11-12 15:16:30 -0500320 Synchronized s(doneMonitor);
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100321 while (activeCounts[0] != 0) {
James E. King, IIIdf899132016-11-12 15:16:30 -0500322 doneMonitor.wait();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000323 }
324 }
325
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100326 std::cout << "\t\t\t"
327 << "Pending tasks " << threadManager->pendingTaskCount() << std::endl;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000328
329 try {
330 threadManager->add(extraTask, 1);
ben-craigfae08e72015-07-15 11:34:47 -0500331 } catch (TimedOutException&) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100332 std::cout << "\t\t\t"
333 << "add timed out unexpectedly" << std::endl;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000334 throw TException("Unexpected timeout adding task");
335
ben-craigfae08e72015-07-15 11:34:47 -0500336 } catch (TooManyPendingTasksException&) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100337 std::cout << "\t\t\t"
338 << "add encountered too many pending exepctions" << std::endl;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000339 throw TException("Unexpected timeout adding task");
340 }
341
342 // Wake up tasks that were pending before and wait for them to complete
343
344 {
James E. King, IIIdf899132016-11-12 15:16:30 -0500345 Synchronized s(blockMonitor);
346 blocked[1] = false;
347 blockMonitor.notifyAll();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000348 }
349
350 {
James E. King, IIIdf899132016-11-12 15:16:30 -0500351 Synchronized s(doneMonitor);
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100352 while (activeCounts[1] != 0) {
James E. King, IIIdf899132016-11-12 15:16:30 -0500353 doneMonitor.wait();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000354 }
355 }
356
357 // Wake up the extra task and wait for it to complete
358
359 {
James E. King, IIIdf899132016-11-12 15:16:30 -0500360 Synchronized s(blockMonitor);
361 blocked[2] = false;
362 blockMonitor.notifyAll();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000363 }
364
365 {
James E. King, IIIdf899132016-11-12 15:16:30 -0500366 Synchronized s(doneMonitor);
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100367 while (activeCounts[2] != 0) {
James E. King, IIIdf899132016-11-12 15:16:30 -0500368 doneMonitor.wait();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000369 }
370 }
371
James E. King, IIIdf899132016-11-12 15:16:30 -0500372 threadManager->stop();
373
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100374 if (!(success = (threadManager->totalTaskCount() == 0))) {
James E. King, IIIdf899132016-11-12 15:16:30 -0500375 throw TException("Unexpected total task count");
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000376 }
377
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100378 } catch (TException& e) {
David Reiss9fcacc82009-06-04 00:32:54 +0000379 std::cout << "ERROR: " << e.what() << std::endl;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000380 }
381
382 std::cout << "\t\t\t" << (success ? "Success" : "Failure") << std::endl;
383 return success;
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100384 }
James E. King, IIIdf899132016-11-12 15:16:30 -0500385
386
387 bool apiTest() {
388
389 // prove currentTime has milliseconds granularity since many other things depend on it
390 int64_t a = Util::currentTime();
391 sleep_(100);
392 int64_t b = Util::currentTime();
393 if (b - a < 50 || b - a > 150) {
394 std::cerr << "\t\t\texpected 100ms gap, found " << (b-a) << "ms gap instead." << std::endl;
395 return false;
396 }
397
cyyca8af9b2019-01-11 22:13:12 +0800398 return apiTestWithThreadFactory(shared_ptr<ThreadFactory>(new ThreadFactory()));
James E. King, IIIdf899132016-11-12 15:16:30 -0500399
400 }
401
cyyca8af9b2019-01-11 22:13:12 +0800402 bool apiTestWithThreadFactory(shared_ptr<ThreadFactory> threadFactory)
James E. King, IIIdf899132016-11-12 15:16:30 -0500403 {
404 shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(1);
405 threadManager->threadFactory(threadFactory);
406
James E. King, IIIdf899132016-11-12 15:16:30 -0500407 std::cout << "\t\t\t\tstarting.. " << std::endl;
408
409 threadManager->start();
cyy316723a2019-01-05 16:35:14 +0800410 threadManager->setExpireCallback(expiredNotifier); // std::bind(&ThreadManagerTests::expiredNotifier, this));
James E. King, IIIdf899132016-11-12 15:16:30 -0500411
412#define EXPECT(FUNC, COUNT) { size_t c = FUNC; if (c != COUNT) { std::cerr << "expected " #FUNC" to be " #COUNT ", but was " << c << std::endl; return false; } }
413
414 EXPECT(threadManager->workerCount(), 1);
415 EXPECT(threadManager->idleWorkerCount(), 1);
416 EXPECT(threadManager->pendingTaskCount(), 0);
417
418 std::cout << "\t\t\t\tadd 2nd worker.. " << std::endl;
419
420 threadManager->addWorker();
421
422 EXPECT(threadManager->workerCount(), 2);
423 EXPECT(threadManager->idleWorkerCount(), 2);
424 EXPECT(threadManager->pendingTaskCount(), 0);
425
426 std::cout << "\t\t\t\tremove 2nd worker.. " << std::endl;
427
428 threadManager->removeWorker();
429
430 EXPECT(threadManager->workerCount(), 1);
431 EXPECT(threadManager->idleWorkerCount(), 1);
432 EXPECT(threadManager->pendingTaskCount(), 0);
433
434 std::cout << "\t\t\t\tremove 1st worker.. " << std::endl;
435
436 threadManager->removeWorker();
437
438 EXPECT(threadManager->workerCount(), 0);
439 EXPECT(threadManager->idleWorkerCount(), 0);
440 EXPECT(threadManager->pendingTaskCount(), 0);
441
442 std::cout << "\t\t\t\tadd blocking task.. " << std::endl;
443
444 // We're going to throw a blocking task into the mix
445 Monitor entryMonitor; // signaled when task is running
446 Monitor blockMonitor; // to be signaled to unblock the task
447 bool blocked(true); // set to false before notifying
448 Monitor doneMonitor; // signaled when count reaches zero
449 size_t activeCount = 1;
450 shared_ptr<ThreadManagerTests::BlockTask> blockingTask(
451 new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked, doneMonitor, activeCount));
452 threadManager->add(blockingTask);
453
454 EXPECT(threadManager->workerCount(), 0);
455 EXPECT(threadManager->idleWorkerCount(), 0);
456 EXPECT(threadManager->pendingTaskCount(), 1);
457
458 std::cout << "\t\t\t\tadd other task.. " << std::endl;
459
460 shared_ptr<ThreadManagerTests::Task> otherTask(
461 new ThreadManagerTests::Task(doneMonitor, activeCount, 0));
462
463 threadManager->add(otherTask);
464
465 EXPECT(threadManager->workerCount(), 0);
466 EXPECT(threadManager->idleWorkerCount(), 0);
467 EXPECT(threadManager->pendingTaskCount(), 2);
468
469 std::cout << "\t\t\t\tremove blocking task specifically.. " << std::endl;
470
471 threadManager->remove(blockingTask);
472
473 EXPECT(threadManager->workerCount(), 0);
474 EXPECT(threadManager->idleWorkerCount(), 0);
475 EXPECT(threadManager->pendingTaskCount(), 1);
476
477 std::cout << "\t\t\t\tremove next pending task.." << std::endl;
478
479 shared_ptr<Runnable> nextTask = threadManager->removeNextPending();
480 if (nextTask != otherTask) {
481 std::cerr << "\t\t\t\t\texpected removeNextPending to return otherTask" << std::endl;
482 return false;
483 }
484
485 EXPECT(threadManager->workerCount(), 0);
486 EXPECT(threadManager->idleWorkerCount(), 0);
487 EXPECT(threadManager->pendingTaskCount(), 0);
488
489 std::cout << "\t\t\t\tremove next pending task (none left).." << std::endl;
490
491 nextTask = threadManager->removeNextPending();
492 if (nextTask) {
493 std::cerr << "\t\t\t\t\texpected removeNextPending to return an empty Runnable" << std::endl;
494 return false;
495 }
496
497 std::cout << "\t\t\t\tadd 2 expired tasks and 1 not.." << std::endl;
498
499 shared_ptr<ThreadManagerTests::Task> expiredTask(
500 new ThreadManagerTests::Task(doneMonitor, activeCount, 0));
501
502 threadManager->add(expiredTask, 0, 1);
503 threadManager->add(blockingTask); // add one that hasn't expired to make sure it gets skipped
504 threadManager->add(expiredTask, 0, 1); // add a second expired to ensure removeExpiredTasks removes both
505
506 sleep_(50); // make sure enough time elapses for it to expire - the shortest expiration time is 1 millisecond
507
508 EXPECT(threadManager->workerCount(), 0);
509 EXPECT(threadManager->idleWorkerCount(), 0);
510 EXPECT(threadManager->pendingTaskCount(), 3);
511 EXPECT(threadManager->expiredTaskCount(), 0);
512
513 std::cout << "\t\t\t\tremove expired tasks.." << std::endl;
514
515 if (!m_expired.empty()) {
516 std::cerr << "\t\t\t\t\texpected m_expired to be empty" << std::endl;
517 return false;
518 }
519
520 threadManager->removeExpiredTasks();
521
522 if (m_expired.size() != 2) {
523 std::cerr << "\t\t\t\t\texpected m_expired to be set" << std::endl;
524 return false;
525 }
526
527 if (m_expired.front() != expiredTask) {
528 std::cerr << "\t\t\t\t\texpected m_expired[0] to be the expired task" << std::endl;
529 return false;
530 }
531 m_expired.pop_front();
532
533 if (m_expired.front() != expiredTask) {
534 std::cerr << "\t\t\t\t\texpected m_expired[1] to be the expired task" << std::endl;
535 return false;
536 }
537
538 m_expired.clear();
539
540 threadManager->remove(blockingTask);
541
542 EXPECT(threadManager->workerCount(), 0);
543 EXPECT(threadManager->idleWorkerCount(), 0);
544 EXPECT(threadManager->pendingTaskCount(), 0);
545 EXPECT(threadManager->expiredTaskCount(), 2);
546
547 std::cout << "\t\t\t\tadd expired task (again).." << std::endl;
548
549 threadManager->add(expiredTask, 0, 1); // expires in 1ms
550 sleep_(50); // make sure enough time elapses for it to expire - the shortest expiration time is 1ms
551
552 std::cout << "\t\t\t\tadd worker to consume expired task.." << std::endl;
553
554 threadManager->addWorker();
555 sleep_(100); // make sure it has time to spin up and expire the task
556
557 if (m_expired.empty()) {
558 std::cerr << "\t\t\t\t\texpected m_expired to be set" << std::endl;
559 return false;
560 }
561
562 if (m_expired.front() != expiredTask) {
563 std::cerr << "\t\t\t\t\texpected m_expired to be the expired task" << std::endl;
564 return false;
565 }
566
567 m_expired.clear();
568
569 EXPECT(threadManager->workerCount(), 1);
570 EXPECT(threadManager->idleWorkerCount(), 1);
571 EXPECT(threadManager->pendingTaskCount(), 0);
572 EXPECT(threadManager->expiredTaskCount(), 3);
573
574 std::cout << "\t\t\t\ttry to remove too many workers" << std::endl;
575 try {
576 threadManager->removeWorker(2);
577 std::cerr << "\t\t\t\t\texpected InvalidArgumentException" << std::endl;
578 return false;
579 } catch (const InvalidArgumentException&) {
580 /* expected */
581 }
582
583 std::cout << "\t\t\t\tremove worker.. " << std::endl;
584
585 threadManager->removeWorker();
586
587 EXPECT(threadManager->workerCount(), 0);
588 EXPECT(threadManager->idleWorkerCount(), 0);
589 EXPECT(threadManager->pendingTaskCount(), 0);
590 EXPECT(threadManager->expiredTaskCount(), 3);
591
592 std::cout << "\t\t\t\tadd blocking task.. " << std::endl;
593
594 threadManager->add(blockingTask);
595
596 EXPECT(threadManager->workerCount(), 0);
597 EXPECT(threadManager->idleWorkerCount(), 0);
598 EXPECT(threadManager->pendingTaskCount(), 1);
599
600 std::cout << "\t\t\t\tadd worker.. " << std::endl;
601
602 threadManager->addWorker();
603 {
604 Synchronized s(entryMonitor);
605 while (!blockingTask->_entered) {
606 entryMonitor.wait();
607 }
608 }
609
610 EXPECT(threadManager->workerCount(), 1);
611 EXPECT(threadManager->idleWorkerCount(), 0);
612 EXPECT(threadManager->pendingTaskCount(), 0);
613
614 std::cout << "\t\t\t\tunblock task and remove worker.. " << std::endl;
615
616 {
617 Synchronized s(blockMonitor);
618 blocked = false;
619 blockMonitor.notifyAll();
620 }
621 threadManager->removeWorker();
622
623 EXPECT(threadManager->workerCount(), 0);
624 EXPECT(threadManager->idleWorkerCount(), 0);
625 EXPECT(threadManager->pendingTaskCount(), 0);
626
627 std::cout << "\t\t\t\tcleanup.. " << std::endl;
628
629 blockingTask.reset();
630 threadManager.reset();
631 return true;
632 }
Marc Slemko740343d2006-07-20 00:31:02 +0000633};
Marc Slemko6f038a72006-08-03 18:58:09 +0000634
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100635}
636}
637}
638} // apache::thrift::concurrency
Marc Slemko740343d2006-07-20 00:31:02 +0000639
T Jake Lucianib5e62212009-01-31 22:36:20 +0000640using namespace apache::thrift::concurrency::test;