blob: c08448b4ff6d80cd7919d315828a795883917d58 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Roger Meierba406d32013-07-15 22:41:34 +020020#include <thrift/thrift-config.h>
Roger Meier49ff8b12012-04-13 09:12:31 +000021#include <thrift/concurrency/ThreadManager.h>
22#include <thrift/concurrency/PlatformThreadFactory.h>
23#include <thrift/concurrency/Monitor.h>
24#include <thrift/concurrency/Util.h>
Marc Slemko740343d2006-07-20 00:31:02 +000025
26#include <assert.h>
27#include <set>
28#include <iostream>
Marc Slemkoc7782972006-07-25 02:26:35 +000029#include <set>
30#include <stdint.h>
Marc Slemko740343d2006-07-20 00:31:02 +000031
Konrad Grochowski240120c2014-11-18 11:33:31 +010032namespace apache { namespace thrift { namespace concurrency { namespace test {
Marc Slemko740343d2006-07-20 00:31:02 +000033
T Jake Lucianib5e62212009-01-31 22:36:20 +000034using namespace apache::thrift::concurrency;
Marc Slemko740343d2006-07-20 00:31:02 +000035
Marc Slemko740343d2006-07-20 00:31:02 +000036class ThreadManagerTests {
37
Konrad Grochowski293a40e2014-09-04 17:28:17 +040038 static const double TEST_TOLERANCE;
39
Marc Slemko740343d2006-07-20 00:31:02 +000040public:
Konrad Grochowski240120c2014-11-18 11:33:31 +010041 class Task: public Runnable {
Marc Slemko740343d2006-07-20 00:31:02 +000042
43 public:
Konrad Grochowski240120c2014-11-18 11:33:31 +010044
45 Task(Monitor& monitor, size_t& count, int64_t timeout) :
46 _monitor(monitor),
47 _count(count),
48 _timeout(timeout),
49 _done(false) {}
Marc Slemko740343d2006-07-20 00:31:02 +000050
51 void run() {
52
Marc Slemkoc7782972006-07-25 02:26:35 +000053 _startTime = Util::currentTime();
Marc Slemko740343d2006-07-20 00:31:02 +000054
Mark Sleef5f2be42006-09-05 21:05:31 +000055 {
56 Synchronized s(_sleep);
Marc Slemko740343d2006-07-20 00:31:02 +000057
Marc Slemko3a3b53b2007-05-22 23:59:54 +000058 try {
59 _sleep.wait(_timeout);
Konrad Grochowski240120c2014-11-18 11:33:31 +010060 } catch(TimedOutException& e) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +000061 ;
Konrad Grochowski240120c2014-11-18 11:33:31 +010062 }catch(...) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +000063 assert(0);
64 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000065 }
Marc Slemko740343d2006-07-20 00:31:02 +000066
Marc Slemkoc7782972006-07-25 02:26:35 +000067 _endTime = Util::currentTime();
68
Marc Slemko740343d2006-07-20 00:31:02 +000069 _done = true;
Marc Slemko3a3b53b2007-05-22 23:59:54 +000070
Mark Sleef5f2be42006-09-05 21:05:31 +000071 {
72 Synchronized s(_monitor);
Marc Slemko740343d2006-07-20 00:31:02 +000073
David Reiss96d23882007-07-26 21:10:32 +000074 // std::cout << "Thread " << _count << " completed " << std::endl;
Marc Slemko3a3b53b2007-05-22 23:59:54 +000075
David Reiss96d23882007-07-26 21:10:32 +000076 _count--;
Marc Slemko740343d2006-07-20 00:31:02 +000077
David Reiss96d23882007-07-26 21:10:32 +000078 if (_count == 0) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +000079
David Reiss96d23882007-07-26 21:10:32 +000080 _monitor.notify();
81 }
Marc Slemko740343d2006-07-20 00:31:02 +000082 }
83 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +000084
Marc Slemko740343d2006-07-20 00:31:02 +000085 Monitor& _monitor;
86 size_t& _count;
Mark Slee9b82d272007-05-23 05:16:07 +000087 int64_t _timeout;
88 int64_t _startTime;
89 int64_t _endTime;
Marc Slemko740343d2006-07-20 00:31:02 +000090 bool _done;
Marc Slemkoc7782972006-07-25 02:26:35 +000091 Monitor _sleep;
Marc Slemko740343d2006-07-20 00:31:02 +000092 };
93
Mark Sleef5f2be42006-09-05 21:05:31 +000094 /**
95 * Dispatch count tasks, each of which blocks for timeout milliseconds then
96 * completes. Verify that all tasks completed and that thread manager cleans
97 * up properly on delete.
98 */
Konrad Grochowski240120c2014-11-18 11:33:31 +010099 bool loadTest(size_t count=100, int64_t timeout=100LL, size_t workerCount=4) {
Marc Slemko740343d2006-07-20 00:31:02 +0000100
101 Monitor monitor;
102
103 size_t activeCount = count;
104
Marc Slemko6f038a72006-08-03 18:58:09 +0000105 shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount);
Marc Slemkoc7782972006-07-25 02:26:35 +0000106
Konrad Grochowski240120c2014-11-18 11:33:31 +0100107 shared_ptr<PlatformThreadFactory> threadFactory = shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory());
Marc Slemkoc7782972006-07-25 02:26:35 +0000108
Roger Meier3faaedf2011-10-02 10:51:45 +0000109#ifndef USE_BOOST_THREAD
Marc Slemkoa6479032007-06-05 22:20:14 +0000110 threadFactory->setPriority(PosixThreadFactory::HIGHEST);
Roger Meier3faaedf2011-10-02 10:51:45 +0000111#endif
Marc Slemkoc7782972006-07-25 02:26:35 +0000112 threadManager->threadFactory(threadFactory);
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000113
114 threadManager->start();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000115
Marc Slemko6f038a72006-08-03 18:58:09 +0000116 std::set<shared_ptr<ThreadManagerTests::Task> > tasks;
Marc Slemko740343d2006-07-20 00:31:02 +0000117
Mark Sleef5f2be42006-09-05 21:05:31 +0000118 for (size_t ix = 0; ix < count; ix++) {
Marc Slemko740343d2006-07-20 00:31:02 +0000119
Konrad Grochowski240120c2014-11-18 11:33:31 +0100120 tasks.insert(shared_ptr<ThreadManagerTests::Task>(new ThreadManagerTests::Task(monitor, activeCount, timeout)));
Marc Slemko740343d2006-07-20 00:31:02 +0000121 }
122
Mark Slee9b82d272007-05-23 05:16:07 +0000123 int64_t time00 = Util::currentTime();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000124
Konrad Grochowski240120c2014-11-18 11:33:31 +0100125 for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
Marc Slemko740343d2006-07-20 00:31:02 +0000126
Konrad Grochowski240120c2014-11-18 11:33:31 +0100127 threadManager->add(*ix);
Marc Slemko740343d2006-07-20 00:31:02 +0000128 }
129
Mark Sleef5f2be42006-09-05 21:05:31 +0000130 {
131 Synchronized s(monitor);
132
Konrad Grochowski240120c2014-11-18 11:33:31 +0100133 while(activeCount > 0) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000134
David Reiss96d23882007-07-26 21:10:32 +0000135 monitor.wait();
Marc Slemko740343d2006-07-20 00:31:02 +0000136 }
137 }
138
Mark Slee9b82d272007-05-23 05:16:07 +0000139 int64_t time01 = Util::currentTime();
Marc Slemko740343d2006-07-20 00:31:02 +0000140
Mark Slee9b82d272007-05-23 05:16:07 +0000141 int64_t firstTime = 9223372036854775807LL;
142 int64_t lastTime = 0;
Marc Slemkoc7782972006-07-25 02:26:35 +0000143
144 double averageTime = 0;
Mark Slee9b82d272007-05-23 05:16:07 +0000145 int64_t minTime = 9223372036854775807LL;
146 int64_t maxTime = 0;
Marc Slemkoc7782972006-07-25 02:26:35 +0000147
Konrad Grochowski240120c2014-11-18 11:33:31 +0100148 for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000149
Marc Slemko6f038a72006-08-03 18:58:09 +0000150 shared_ptr<ThreadManagerTests::Task> task = *ix;
Marc Slemkoc7782972006-07-25 02:26:35 +0000151
Mark Slee9b82d272007-05-23 05:16:07 +0000152 int64_t delta = task->_endTime - task->_startTime;
Marc Slemkoc7782972006-07-25 02:26:35 +0000153
154 assert(delta > 0);
155
Mark Sleef5f2be42006-09-05 21:05:31 +0000156 if (task->_startTime < firstTime) {
David Reiss96d23882007-07-26 21:10:32 +0000157 firstTime = task->_startTime;
Marc Slemkoc7782972006-07-25 02:26:35 +0000158 }
159
Mark Sleef5f2be42006-09-05 21:05:31 +0000160 if (task->_endTime > lastTime) {
David Reiss96d23882007-07-26 21:10:32 +0000161 lastTime = task->_endTime;
Marc Slemkoc7782972006-07-25 02:26:35 +0000162 }
163
Mark Sleef5f2be42006-09-05 21:05:31 +0000164 if (delta < minTime) {
David Reiss96d23882007-07-26 21:10:32 +0000165 minTime = delta;
Marc Slemkoc7782972006-07-25 02:26:35 +0000166 }
167
Mark Sleef5f2be42006-09-05 21:05:31 +0000168 if (delta > maxTime) {
David Reiss96d23882007-07-26 21:10:32 +0000169 maxTime = delta;
Marc Slemkoc7782972006-07-25 02:26:35 +0000170 }
171
Konrad Grochowski240120c2014-11-18 11:33:31 +0100172 averageTime+= delta;
Marc Slemko740343d2006-07-20 00:31:02 +0000173 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000174
Marc Slemkoc7782972006-07-25 02:26:35 +0000175 averageTime /= count;
176
Konrad Grochowski240120c2014-11-18 11:33:31 +0100177 std::cout << "\t\t\tfirst start: " << firstTime << "ms Last end: " << lastTime << "ms min: " << minTime << "ms max: " << maxTime << "ms average: " << averageTime << "ms" << std::endl;
Marc Slemko740343d2006-07-20 00:31:02 +0000178
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000179 double expectedTime = ((count + (workerCount - 1)) / workerCount) * timeout;
180
181 double error = ((time01 - time00) - expectedTime) / expectedTime;
182
Mark Sleef5f2be42006-09-05 21:05:31 +0000183 if (error < 0) {
Konrad Grochowski240120c2014-11-18 11:33:31 +0100184 error*= -1.0;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000185 }
186
Konrad Grochowski293a40e2014-09-04 17:28:17 +0400187 bool success = error < TEST_TOLERANCE;
Marc Slemkoc7782972006-07-25 02:26:35 +0000188
Konrad Grochowski240120c2014-11-18 11:33:31 +0100189 std::cout << "\t\t\t" << (success ? "Success" : "Failure") << "! expected time: " << expectedTime << "ms elapsed time: "<< time01 - time00 << "ms error%: " << error * 100.0 << std::endl;
Marc Slemko740343d2006-07-20 00:31:02 +0000190
Marc Slemkoc7782972006-07-25 02:26:35 +0000191 return success;
Marc Slemko740343d2006-07-20 00:31:02 +0000192 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000193
Konrad Grochowski240120c2014-11-18 11:33:31 +0100194 class BlockTask: public Runnable {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000195
196 public:
Konrad Grochowski240120c2014-11-18 11:33:31 +0100197
198 BlockTask(Monitor& monitor, Monitor& bmonitor, size_t& count) :
199 _monitor(monitor),
200 _bmonitor(bmonitor),
201 _count(count) {}
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000202
203 void run() {
204 {
205 Synchronized s(_bmonitor);
206
207 _bmonitor.wait();
Konrad Grochowski240120c2014-11-18 11:33:31 +0100208
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000209 }
210
211 {
212 Synchronized s(_monitor);
213
214 _count--;
215
216 if (_count == 0) {
217
218 _monitor.notify();
219 }
220 }
221 }
222
223 Monitor& _monitor;
224 Monitor& _bmonitor;
225 size_t& _count;
226 };
227
228 /**
229 * Block test. Create pendingTaskCountMax tasks. Verify that we block adding the
230 * pendingTaskCountMax + 1th task. Verify that we unblock when a task completes */
231
Konrad Grochowski240120c2014-11-18 11:33:31 +0100232 bool blockTest(int64_t timeout=100LL, size_t workerCount=2) {
233 (void) timeout;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000234 bool success = false;
235
236 try {
237
238 Monitor bmonitor;
239 Monitor monitor;
240
241 size_t pendingTaskMaxCount = workerCount;
242
243 size_t activeCounts[] = {workerCount, pendingTaskMaxCount, 1};
244
Konrad Grochowski240120c2014-11-18 11:33:31 +0100245 shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount, pendingTaskMaxCount);
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000246
Konrad Grochowski240120c2014-11-18 11:33:31 +0100247 shared_ptr<PlatformThreadFactory> threadFactory = shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory());
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000248
Roger Meier3faaedf2011-10-02 10:51:45 +0000249#ifndef USE_BOOST_THREAD
Marc Slemkoa6479032007-06-05 22:20:14 +0000250 threadFactory->setPriority(PosixThreadFactory::HIGHEST);
Roger Meier3faaedf2011-10-02 10:51:45 +0000251#endif
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000252 threadManager->threadFactory(threadFactory);
253
254 threadManager->start();
255
256 std::set<shared_ptr<ThreadManagerTests::BlockTask> > tasks;
257
258 for (size_t ix = 0; ix < workerCount; ix++) {
259
Konrad Grochowski240120c2014-11-18 11:33:31 +0100260 tasks.insert(shared_ptr<ThreadManagerTests::BlockTask>(new ThreadManagerTests::BlockTask(monitor, bmonitor,activeCounts[0])));
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000261 }
262
263 for (size_t ix = 0; ix < pendingTaskMaxCount; ix++) {
264
Konrad Grochowski240120c2014-11-18 11:33:31 +0100265 tasks.insert(shared_ptr<ThreadManagerTests::BlockTask>(new ThreadManagerTests::BlockTask(monitor, bmonitor,activeCounts[1])));
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000266 }
267
Konrad Grochowski240120c2014-11-18 11:33:31 +0100268 for (std::set<shared_ptr<ThreadManagerTests::BlockTask> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
David Reiss96d23882007-07-26 21:10:32 +0000269 threadManager->add(*ix);
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000270 }
271
Konrad Grochowski240120c2014-11-18 11:33:31 +0100272 if(!(success = (threadManager->totalTaskCount() == pendingTaskMaxCount + workerCount))) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000273 throw TException("Unexpected pending task count");
274 }
275
Konrad Grochowski240120c2014-11-18 11:33:31 +0100276 shared_ptr<ThreadManagerTests::BlockTask> extraTask(new ThreadManagerTests::BlockTask(monitor, bmonitor, activeCounts[2]));
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000277
278 try {
279 threadManager->add(extraTask, 1);
280 throw TException("Unexpected success adding task in excess of pending task count");
Konrad Grochowski240120c2014-11-18 11:33:31 +0100281 } catch(TooManyPendingTasksException& e) {
David Reiss9fcacc82009-06-04 00:32:54 +0000282 throw TException("Should have timed out adding task in excess of pending task count");
Konrad Grochowski240120c2014-11-18 11:33:31 +0100283 } catch(TimedOutException& e) {
David Reiss9fcacc82009-06-04 00:32:54 +0000284 // Expected result
285 }
286
287 try {
288 threadManager->add(extraTask, -1);
289 throw TException("Unexpected success adding task in excess of pending task count");
Konrad Grochowski240120c2014-11-18 11:33:31 +0100290 } catch(TimedOutException& e) {
David Reiss9fcacc82009-06-04 00:32:54 +0000291 throw TException("Unexpected timeout adding task in excess of pending task count");
Konrad Grochowski240120c2014-11-18 11:33:31 +0100292 } catch(TooManyPendingTasksException& e) {
David Reiss9fcacc82009-06-04 00:32:54 +0000293 // Expected result
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000294 }
295
Konrad Grochowski240120c2014-11-18 11:33:31 +0100296 std::cout << "\t\t\t" << "Pending tasks " << threadManager->pendingTaskCount() << std::endl;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000297
298 {
299 Synchronized s(bmonitor);
300
301 bmonitor.notifyAll();
302 }
303
304 {
305 Synchronized s(monitor);
306
Konrad Grochowski240120c2014-11-18 11:33:31 +0100307 while(activeCounts[0] != 0) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000308 monitor.wait();
309 }
310 }
311
Konrad Grochowski240120c2014-11-18 11:33:31 +0100312 std::cout << "\t\t\t" << "Pending tasks " << threadManager->pendingTaskCount() << std::endl;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000313
314 try {
315 threadManager->add(extraTask, 1);
Konrad Grochowski240120c2014-11-18 11:33:31 +0100316 } catch(TimedOutException& e) {
317 std::cout << "\t\t\t" << "add timed out unexpectedly" << std::endl;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000318 throw TException("Unexpected timeout adding task");
319
Konrad Grochowski240120c2014-11-18 11:33:31 +0100320 } catch(TooManyPendingTasksException& e) {
321 std::cout << "\t\t\t" << "add encountered too many pending exepctions" << std::endl;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000322 throw TException("Unexpected timeout adding task");
323 }
324
325 // Wake up tasks that were pending before and wait for them to complete
326
327 {
328 Synchronized s(bmonitor);
329
330 bmonitor.notifyAll();
331 }
332
333 {
334 Synchronized s(monitor);
335
Konrad Grochowski240120c2014-11-18 11:33:31 +0100336 while(activeCounts[1] != 0) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000337 monitor.wait();
338 }
339 }
340
341 // Wake up the extra task and wait for it to complete
342
343 {
344 Synchronized s(bmonitor);
345
346 bmonitor.notifyAll();
347 }
348
349 {
350 Synchronized s(monitor);
351
Konrad Grochowski240120c2014-11-18 11:33:31 +0100352 while(activeCounts[2] != 0) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000353 monitor.wait();
354 }
355 }
356
Konrad Grochowski240120c2014-11-18 11:33:31 +0100357 if(!(success = (threadManager->totalTaskCount() == 0))) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000358 throw TException("Unexpected pending task count");
359 }
360
Konrad Grochowski240120c2014-11-18 11:33:31 +0100361 } catch(TException& e) {
David Reiss9fcacc82009-06-04 00:32:54 +0000362 std::cout << "ERROR: " << e.what() << std::endl;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000363 }
364
365 std::cout << "\t\t\t" << (success ? "Success" : "Failure") << std::endl;
366 return success;
Konrad Grochowski240120c2014-11-18 11:33:31 +0100367 }
Marc Slemko740343d2006-07-20 00:31:02 +0000368};
Marc Slemko6f038a72006-08-03 18:58:09 +0000369
Konrad Grochowski293a40e2014-09-04 17:28:17 +0400370const double ThreadManagerTests::TEST_TOLERANCE = .20;
Konrad Grochowski240120c2014-11-18 11:33:31 +0100371
372}}}} // apache::thrift::concurrency
Marc Slemko740343d2006-07-20 00:31:02 +0000373
T Jake Lucianib5e62212009-01-31 22:36:20 +0000374using namespace apache::thrift::concurrency::test;