blob: a8fdcdacae6c6c21e8dd819e535d9f0f3d25b6ee [file] [log] [blame]
Mark Slee9f0c6512007-02-28 23:58:26 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Marc Slemkoc7782972006-07-25 02:26:35 +00007#include <config.h>
Marc Slemko6f038a72006-08-03 18:58:09 +00008#include <concurrency/ThreadManager.h>
9#include <concurrency/PosixThreadFactory.h>
10#include <concurrency/Monitor.h>
11#include <concurrency/Util.h>
Marc Slemko740343d2006-07-20 00:31:02 +000012
13#include <assert.h>
14#include <set>
15#include <iostream>
Marc Slemkoc7782972006-07-25 02:26:35 +000016#include <set>
17#include <stdint.h>
Marc Slemko740343d2006-07-20 00:31:02 +000018
19namespace facebook { namespace thrift { namespace concurrency { namespace test {
20
21using namespace facebook::thrift::concurrency;
22
Mark Sleef5f2be42006-09-05 21:05:31 +000023/**
Marc Slemko3a3b53b2007-05-22 23:59:54 +000024 * ThreadManagerTests class
Mark Sleef5f2be42006-09-05 21:05:31 +000025 *
26 * @author marc
27 * @version $Id:$
28 */
Marc Slemko740343d2006-07-20 00:31:02 +000029class ThreadManagerTests {
30
31public:
32
Marc Slemko6f038a72006-08-03 18:58:09 +000033 static const double ERROR;
34
Marc Slemko740343d2006-07-20 00:31:02 +000035 class Task: public Runnable {
36
37 public:
Marc Slemko3a3b53b2007-05-22 23:59:54 +000038
Mark Slee9b82d272007-05-23 05:16:07 +000039 Task(Monitor& monitor, size_t& count, int64_t timeout) :
Marc Slemko740343d2006-07-20 00:31:02 +000040 _monitor(monitor),
41 _count(count),
42 _timeout(timeout),
Marc Slemko740343d2006-07-20 00:31:02 +000043 _done(false) {}
44
45 void run() {
46
Marc Slemkoc7782972006-07-25 02:26:35 +000047 _startTime = Util::currentTime();
Marc Slemko740343d2006-07-20 00:31:02 +000048
Mark Sleef5f2be42006-09-05 21:05:31 +000049 {
50 Synchronized s(_sleep);
Marc Slemko740343d2006-07-20 00:31:02 +000051
Marc Slemko3a3b53b2007-05-22 23:59:54 +000052 try {
53 _sleep.wait(_timeout);
54 } catch(TimedOutException& e) {
55 ;
56 }catch(...) {
57 assert(0);
58 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000059 }
Marc Slemko740343d2006-07-20 00:31:02 +000060
Marc Slemkoc7782972006-07-25 02:26:35 +000061 _endTime = Util::currentTime();
62
Marc Slemko740343d2006-07-20 00:31:02 +000063 _done = true;
Marc Slemko3a3b53b2007-05-22 23:59:54 +000064
Mark Sleef5f2be42006-09-05 21:05:31 +000065 {
66 Synchronized s(_monitor);
Marc Slemko740343d2006-07-20 00:31:02 +000067
68 // std::cout << "Thread " << _count << " completed " << std::endl;
Marc Slemko3a3b53b2007-05-22 23:59:54 +000069
Marc Slemko740343d2006-07-20 00:31:02 +000070 _count--;
71
Mark Sleef5f2be42006-09-05 21:05:31 +000072 if (_count == 0) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +000073
Marc Slemko740343d2006-07-20 00:31:02 +000074 _monitor.notify();
75 }
76 }
77 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +000078
Marc Slemko740343d2006-07-20 00:31:02 +000079 Monitor& _monitor;
80 size_t& _count;
Mark Slee9b82d272007-05-23 05:16:07 +000081 int64_t _timeout;
82 int64_t _startTime;
83 int64_t _endTime;
Marc Slemko740343d2006-07-20 00:31:02 +000084 bool _done;
Marc Slemkoc7782972006-07-25 02:26:35 +000085 Monitor _sleep;
Marc Slemko740343d2006-07-20 00:31:02 +000086 };
87
Mark Sleef5f2be42006-09-05 21:05:31 +000088 /**
89 * Dispatch count tasks, each of which blocks for timeout milliseconds then
90 * completes. Verify that all tasks completed and that thread manager cleans
91 * up properly on delete.
92 */
Mark Slee9b82d272007-05-23 05:16:07 +000093 bool loadTest(size_t count=100, int64_t timeout=100LL, size_t workerCount=4) {
Marc Slemko740343d2006-07-20 00:31:02 +000094
95 Monitor monitor;
96
97 size_t activeCount = count;
98
Marc Slemko6f038a72006-08-03 18:58:09 +000099 shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount);
Marc Slemkoc7782972006-07-25 02:26:35 +0000100
Marc Slemko6f038a72006-08-03 18:58:09 +0000101 shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
Marc Slemkoc7782972006-07-25 02:26:35 +0000102
Marc Slemkoa6479032007-06-05 22:20:14 +0000103 threadFactory->setPriority(PosixThreadFactory::HIGHEST);
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000104
Marc Slemkoc7782972006-07-25 02:26:35 +0000105 threadManager->threadFactory(threadFactory);
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000106
107 threadManager->start();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000108
Marc Slemko6f038a72006-08-03 18:58:09 +0000109 std::set<shared_ptr<ThreadManagerTests::Task> > tasks;
Marc Slemko740343d2006-07-20 00:31:02 +0000110
Mark Sleef5f2be42006-09-05 21:05:31 +0000111 for (size_t ix = 0; ix < count; ix++) {
Marc Slemko740343d2006-07-20 00:31:02 +0000112
Marc Slemko6f038a72006-08-03 18:58:09 +0000113 tasks.insert(shared_ptr<ThreadManagerTests::Task>(new ThreadManagerTests::Task(monitor, activeCount, timeout)));
Marc Slemko740343d2006-07-20 00:31:02 +0000114 }
115
Mark Slee9b82d272007-05-23 05:16:07 +0000116 int64_t time00 = Util::currentTime();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000117
Mark Sleef5f2be42006-09-05 21:05:31 +0000118 for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
Marc Slemko740343d2006-07-20 00:31:02 +0000119
120 threadManager->add(*ix);
121 }
122
Mark Sleef5f2be42006-09-05 21:05:31 +0000123 {
124 Synchronized s(monitor);
125
Marc Slemko740343d2006-07-20 00:31:02 +0000126 while(activeCount > 0) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000127
Marc Slemko740343d2006-07-20 00:31:02 +0000128 monitor.wait();
129 }
130 }
131
Mark Slee9b82d272007-05-23 05:16:07 +0000132 int64_t time01 = Util::currentTime();
Marc Slemko740343d2006-07-20 00:31:02 +0000133
Mark Slee9b82d272007-05-23 05:16:07 +0000134 int64_t firstTime = 9223372036854775807LL;
135 int64_t lastTime = 0;
Marc Slemkoc7782972006-07-25 02:26:35 +0000136
137 double averageTime = 0;
Mark Slee9b82d272007-05-23 05:16:07 +0000138 int64_t minTime = 9223372036854775807LL;
139 int64_t maxTime = 0;
Marc Slemkoc7782972006-07-25 02:26:35 +0000140
Mark Sleef5f2be42006-09-05 21:05:31 +0000141 for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000142
Marc Slemko6f038a72006-08-03 18:58:09 +0000143 shared_ptr<ThreadManagerTests::Task> task = *ix;
Marc Slemkoc7782972006-07-25 02:26:35 +0000144
Mark Slee9b82d272007-05-23 05:16:07 +0000145 int64_t delta = task->_endTime - task->_startTime;
Marc Slemkoc7782972006-07-25 02:26:35 +0000146
147 assert(delta > 0);
148
Mark Sleef5f2be42006-09-05 21:05:31 +0000149 if (task->_startTime < firstTime) {
Marc Slemkoc7782972006-07-25 02:26:35 +0000150 firstTime = task->_startTime;
151 }
152
Mark Sleef5f2be42006-09-05 21:05:31 +0000153 if (task->_endTime > lastTime) {
Marc Slemkoc7782972006-07-25 02:26:35 +0000154 lastTime = task->_endTime;
155 }
156
Mark Sleef5f2be42006-09-05 21:05:31 +0000157 if (delta < minTime) {
Marc Slemkoc7782972006-07-25 02:26:35 +0000158 minTime = delta;
159 }
160
Mark Sleef5f2be42006-09-05 21:05:31 +0000161 if (delta > maxTime) {
Marc Slemkoc7782972006-07-25 02:26:35 +0000162 maxTime = delta;
163 }
164
165 averageTime+= delta;
Marc Slemko740343d2006-07-20 00:31:02 +0000166 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000167
Marc Slemkoc7782972006-07-25 02:26:35 +0000168 averageTime /= count;
169
170 std::cout << "\t\t\tfirst start: " << firstTime << "ms Last end: " << lastTime << "ms min: " << minTime << "ms max: " << maxTime << "ms average: " << averageTime << "ms" << std::endl;
Marc Slemko740343d2006-07-20 00:31:02 +0000171
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000172 double expectedTime = ((count + (workerCount - 1)) / workerCount) * timeout;
173
174 double error = ((time01 - time00) - expectedTime) / expectedTime;
175
Mark Sleef5f2be42006-09-05 21:05:31 +0000176 if (error < 0) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000177 error*= -1.0;
178 }
179
Marc Slemko6f038a72006-08-03 18:58:09 +0000180 bool success = error < ERROR;
Marc Slemkoc7782972006-07-25 02:26:35 +0000181
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000182 std::cout << "\t\t\t" << (success ? "Success" : "Failure") << "! expected time: " << expectedTime << "ms elapsed time: "<< time01 - time00 << "ms error%: " << error * 100.0 << std::endl;
Marc Slemko740343d2006-07-20 00:31:02 +0000183
Marc Slemkoc7782972006-07-25 02:26:35 +0000184 return success;
Marc Slemko740343d2006-07-20 00:31:02 +0000185 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000186
187 class BlockTask: public Runnable {
188
189 public:
190
191 BlockTask(Monitor& monitor, Monitor& bmonitor, size_t& count) :
192 _monitor(monitor),
193 _bmonitor(bmonitor),
194 _count(count) {}
195
196 void run() {
197 {
198 Synchronized s(_bmonitor);
199
200 _bmonitor.wait();
201
202 }
203
204 {
205 Synchronized s(_monitor);
206
207 _count--;
208
209 if (_count == 0) {
210
211 _monitor.notify();
212 }
213 }
214 }
215
216 Monitor& _monitor;
217 Monitor& _bmonitor;
218 size_t& _count;
219 };
220
221 /**
222 * Block test. Create pendingTaskCountMax tasks. Verify that we block adding the
223 * pendingTaskCountMax + 1th task. Verify that we unblock when a task completes */
224
Mark Slee9b82d272007-05-23 05:16:07 +0000225 bool blockTest(int64_t timeout=100LL, size_t workerCount=2) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000226
227 bool success = false;
228
229 try {
230
231 Monitor bmonitor;
232 Monitor monitor;
233
234 size_t pendingTaskMaxCount = workerCount;
235
236 size_t activeCounts[] = {workerCount, pendingTaskMaxCount, 1};
237
238 shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount, pendingTaskMaxCount);
239
240 shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
241
Marc Slemkoa6479032007-06-05 22:20:14 +0000242 threadFactory->setPriority(PosixThreadFactory::HIGHEST);
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000243
244 threadManager->threadFactory(threadFactory);
245
246 threadManager->start();
247
248 std::set<shared_ptr<ThreadManagerTests::BlockTask> > tasks;
249
250 for (size_t ix = 0; ix < workerCount; ix++) {
251
252 tasks.insert(shared_ptr<ThreadManagerTests::BlockTask>(new ThreadManagerTests::BlockTask(monitor, bmonitor,activeCounts[0])));
253 }
254
255 for (size_t ix = 0; ix < pendingTaskMaxCount; ix++) {
256
257 tasks.insert(shared_ptr<ThreadManagerTests::BlockTask>(new ThreadManagerTests::BlockTask(monitor, bmonitor,activeCounts[1])));
258 }
259
260 for (std::set<shared_ptr<ThreadManagerTests::BlockTask> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
261 threadManager->add(*ix);
262 }
263
264 if(!(success = (threadManager->totalTaskCount() == pendingTaskMaxCount + workerCount))) {
265 throw TException("Unexpected pending task count");
266 }
267
268 shared_ptr<ThreadManagerTests::BlockTask> extraTask(new ThreadManagerTests::BlockTask(monitor, bmonitor, activeCounts[2]));
269
270 try {
271 threadManager->add(extraTask, 1);
272 throw TException("Unexpected success adding task in excess of pending task count");
273 } catch(TimedOutException& e) {
274 }
275
276 std::cout << "\t\t\t" << "Pending tasks " << threadManager->pendingTaskCount() << std::endl;
277
278 {
279 Synchronized s(bmonitor);
280
281 bmonitor.notifyAll();
282 }
283
284 {
285 Synchronized s(monitor);
286
287 while(activeCounts[0] != 0) {
288 monitor.wait();
289 }
290 }
291
292 std::cout << "\t\t\t" << "Pending tasks " << threadManager->pendingTaskCount() << std::endl;
293
294 try {
295 threadManager->add(extraTask, 1);
296 } catch(TimedOutException& e) {
297 std::cout << "\t\t\t" << "add timed out unexpectedly" << std::endl;
298 throw TException("Unexpected timeout adding task");
299
300 } catch(TooManyPendingTasksException& e) {
301 std::cout << "\t\t\t" << "add encountered too many pending exepctions" << std::endl;
302 throw TException("Unexpected timeout adding task");
303 }
304
305 // Wake up tasks that were pending before and wait for them to complete
306
307 {
308 Synchronized s(bmonitor);
309
310 bmonitor.notifyAll();
311 }
312
313 {
314 Synchronized s(monitor);
315
316 while(activeCounts[1] != 0) {
317 monitor.wait();
318 }
319 }
320
321 // Wake up the extra task and wait for it to complete
322
323 {
324 Synchronized s(bmonitor);
325
326 bmonitor.notifyAll();
327 }
328
329 {
330 Synchronized s(monitor);
331
332 while(activeCounts[2] != 0) {
333 monitor.wait();
334 }
335 }
336
337 if(!(success = (threadManager->totalTaskCount() == 0))) {
338 throw TException("Unexpected pending task count");
339 }
340
341 } catch(TException& e) {
342 }
343
344 std::cout << "\t\t\t" << (success ? "Success" : "Failure") << std::endl;
345 return success;
346 }
Marc Slemko740343d2006-07-20 00:31:02 +0000347};
Marc Slemko6f038a72006-08-03 18:58:09 +0000348
349const double ThreadManagerTests::ERROR = .20;
350
Marc Slemko740343d2006-07-20 00:31:02 +0000351}}}} // facebook::thrift::concurrency
352
353using namespace facebook::thrift::concurrency::test;
354