blob: 85bcb291d22b3a9e2de2d8ec98a0e7f1b235ae68 [file] [log] [blame]
Jake Farrellb95b0ff2012-03-22 21:49:10 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19module client_pool_test;
20
21import core.time : Duration, dur;
22import core.thread : Thread;
23import std.algorithm;
24import std.array;
25import std.conv;
26import std.exception;
27import std.getopt;
28import std.range;
29import std.stdio;
30import std.typecons;
31import thrift.base;
32import thrift.async.libevent;
33import thrift.async.socket;
34import thrift.codegen.base;
35import thrift.codegen.async_client;
36import thrift.codegen.async_client_pool;
37import thrift.codegen.client;
38import thrift.codegen.client_pool;
39import thrift.codegen.processor;
40import thrift.protocol.binary;
41import thrift.server.simple;
42import thrift.server.transport.socket;
43import thrift.transport.buffered;
44import thrift.transport.socket;
45import thrift.util.cancellation;
46import thrift.util.future;
47
48// We use this as our RPC-layer exception here to make sure socket/… problems
49// (that would usually considered to be RPC layer faults) cause the tests to
50// fail, even though we are testing the RPC exception handling.
51class TestServiceException : TException {
52 int port;
53}
54
55interface TestService {
56 int getPort();
57 alias .TestServiceException TestServiceException;
58 enum methodMeta = [TMethodMeta("getPort", [],
59 [TExceptionMeta("a", 1, "TestServiceException")])];
60}
61
62// Use some derived service, just to check that the pools handle inheritance
63// correctly.
64interface ExTestService : TestService {
65 int[] getPortInArray();
66 enum methodMeta = [TMethodMeta("getPortInArray", [],
67 [TExceptionMeta("a", 1, "TestServiceException")])];
68}
69
70class ExTestHandler : ExTestService {
71 this(ushort port, Duration delay, bool failing, bool trace) {
72 this.port = port;
73 this.delay = delay;
74 this.failing = failing;
75 this.trace = trace;
76 }
77
78 override int getPort() {
79 if (trace) {
80 stderr.writefln("getPort() called on %s (delay: %s, failing: %s)", port,
81 delay, failing);
82 }
83 sleep();
84 failIfEnabled();
85 return port;
86 }
87
88 override int[] getPortInArray() {
89 return [getPort()];
90 }
91
92 ushort port;
93 Duration delay;
94 bool failing;
95 bool trace;
96
97private:
98 void sleep() {
99 if (delay > dur!"hnsecs"(0)) Thread.sleep(delay);
100 }
101
102 void failIfEnabled() {
103 if (!failing) return;
104
105 auto e = new TestServiceException;
106 e.port = port;
107 throw e;
108 }
109}
110
111class ServerThread : Thread {
112 this(ExTestHandler handler, TCancellation cancellation) {
113 super(&run);
114 handler_ = handler;
115 cancellation_ = cancellation;
116 }
117private:
118 void run() {
119 try {
120 auto protocolFactory = new TBinaryProtocolFactory!();
121 auto processor = new TServiceProcessor!ExTestService(handler_);
122 auto serverTransport = new TServerSocket(handler_.port);
123 serverTransport.recvTimeout = dur!"seconds"(3);
124 auto transportFactory = new TBufferedTransportFactory;
125
126 auto server = new TSimpleServer(
127 processor, serverTransport, transportFactory, protocolFactory);
128 server.serve(cancellation_);
129 } catch (Exception e) {
130 writefln("Server thread on port %s failed: %s", handler_.port, e);
131 }
132 }
133
134 TCancellation cancellation_;
135 ExTestHandler handler_;
136}
137
138void main(string[] args) {
139 bool trace;
140 ushort port = 9090;
141 getopt(args, "port", &port, "trace", &trace);
142
143 auto serverCancellation = new TCancellationOrigin;
144 scope (exit) serverCancellation.trigger();
145
146 immutable ports = cast(immutable)array(map!"cast(ushort)a"(iota(port, port + 6)));
147
148version (none) {
149 // Cannot use this due to multiple DMD @@BUG@@s:
150 // 1. »function D main is a nested function and cannot be accessed from array«
151 // when calling array() on the result of the outer map() – would have to
152 // manually do the eager evaluation/array conversion.
153 // 2. »Zip.opSlice cannot get frame pointer to map« for the delay argument,
154 // can be worked around by calling array() on the map result first.
155 // 3. Even when using the workarounds for the last two points, the DMD-built
156 // executable crashes when building without (sic!) inlining enabled,
157 // the backtrace points into the first delegate literal.
158 auto handlers = array(map!((args){
159 return new ExTestHandler(args._0, args._1, args._2, trace);
160 })(zip(
161 ports,
162 map!((a){ return dur!`msecs`(a); })([1, 10, 100, 1, 10, 100]),
163 [false, false, false, true, true, true]
164 )));
165} else {
166 auto handlers = [
167 new ExTestHandler(cast(ushort)(port + 0), dur!"msecs"(1), false, trace),
168 new ExTestHandler(cast(ushort)(port + 1), dur!"msecs"(10), false, trace),
169 new ExTestHandler(cast(ushort)(port + 2), dur!"msecs"(100), false, trace),
170 new ExTestHandler(cast(ushort)(port + 3), dur!"msecs"(1), true, trace),
171 new ExTestHandler(cast(ushort)(port + 4), dur!"msecs"(10), true, trace),
172 new ExTestHandler(cast(ushort)(port + 5), dur!"msecs"(100), true, trace)
173 ];
174}
175
176 // Fire up the server threads.
177 foreach (h; handlers) (new ServerThread(h, serverCancellation)).start();
178
179 // Give the servers some time to get up. This should really be accomplished
180 // via a barrier here and in the preServe() hook.
181 Thread.sleep(dur!"msecs"(10));
182
183 syncClientPoolTest(ports, handlers);
184 asyncClientPoolTest(ports, handlers);
185 asyncFastestClientPoolTest(ports, handlers);
186 asyncAggregatorTest(ports, handlers);
187}
188
189
190void syncClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) {
191 auto clients = array(map!((a){
192 return cast(TClientBase!ExTestService)tClient!ExTestService(
193 tBinaryProtocol(new TSocket("127.0.0.1", a))
194 );
195 })(ports));
196
197 scope(exit) foreach (c; clients) c.outputProtocol.transport.close();
198
199 // Try the case where the first client succeeds.
200 {
201 enforce(makePool(clients).getPort() == ports[0]);
202 }
203
204 // Try the case where all clients fail.
205 {
206 auto pool = makePool(clients[3 .. $]);
207 auto e = cast(TCompoundOperationException)collectException(pool.getPort());
208 enforce(e);
209 enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions),
210 ports[3 .. $]));
211 }
212
213 // Try the case where the first clients fail, but a later one succeeds.
214 {
215 auto pool = makePool(clients[3 .. $] ~ clients[0 .. 3]);
216 enforce(pool.getPortInArray() == [ports[0]]);
217 }
218
219 // Make sure a client is properly deactivated when it has failed too often.
220 {
221 auto pool = makePool(clients);
222 pool.faultDisableCount = 1;
223 pool.faultDisableDuration = dur!"msecs"(50);
224
225 handlers[0].failing = true;
226 enforce(pool.getPort() == ports[1]);
227
228 handlers[0].failing = false;
229 enforce(pool.getPort() == ports[1]);
230
231 Thread.sleep(dur!"msecs"(50));
232 enforce(pool.getPort() == ports[0]);
233 }
234}
235
236auto makePool(TClientBase!ExTestService[] clients) {
237 auto p = tClientPool(clients);
238 p.permuteClients = false;
239 p.rpcFaultFilter = (Exception e) {
240 return (cast(TestServiceException)e !is null);
241 };
242 return p;
243}
244
245
246void asyncClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) {
247 auto manager = new TLibeventAsyncManager;
248 scope (exit) manager.stop(dur!"hnsecs"(0));
249
250 auto clients = makeAsyncClients(manager, ports);
251 scope(exit) foreach (c; clients) c.transport.close();
252
253 // Try the case where the first client succeeds.
254 {
255 enforce(makeAsyncPool(clients).getPort() == ports[0]);
256 }
257
258 // Try the case where all clients fail.
259 {
260 auto pool = makeAsyncPool(clients[3 .. $]);
261 auto e = cast(TCompoundOperationException)collectException(pool.getPort().waitGet());
262 enforce(e);
263 enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions),
264 ports[3 .. $]));
265 }
266
267 // Try the case where the first clients fail, but a later one succeeds.
268 {
269 auto pool = makeAsyncPool(clients[3 .. $] ~ clients[0 .. 3]);
270 enforce(pool.getPortInArray() == [ports[0]]);
271 }
272
273 // Make sure a client is properly deactivated when it has failed too often.
274 {
275 auto pool = makeAsyncPool(clients);
276 pool.faultDisableCount = 1;
277 pool.faultDisableDuration = dur!"msecs"(50);
278
279 handlers[0].failing = true;
280 enforce(pool.getPort() == ports[1]);
281
282 handlers[0].failing = false;
283 enforce(pool.getPort() == ports[1]);
284
285 Thread.sleep(dur!"msecs"(50));
286 enforce(pool.getPort() == ports[0]);
287 }
288}
289
290auto makeAsyncPool(TAsyncClientBase!ExTestService[] clients) {
291 auto p = tAsyncClientPool(clients);
292 p.permuteClients = false;
293 p.rpcFaultFilter = (Exception e) {
294 return (cast(TestServiceException)e !is null);
295 };
296 return p;
297}
298
299auto makeAsyncClients(TLibeventAsyncManager manager, in ushort[] ports) {
300 // DMD @@BUG@@ workaround: Using array on the lazyHandlers map result leads
301 // to »function D main is a nested function and cannot be accessed from array«.
302 // Thus, we manually do the array conversion.
303 auto lazyClients = map!((a){
304 return new TAsyncClient!ExTestService(
305 new TAsyncSocket(manager, "127.0.0.1", a),
306 new TBufferedTransportFactory,
307 new TBinaryProtocolFactory!(TBufferedTransport)
308 );
309 })(ports);
310 TAsyncClientBase!ExTestService[] clients;
311 foreach (c; lazyClients) clients ~= c;
312 return clients;
313}
314
315
316void asyncFastestClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) {
317 auto manager = new TLibeventAsyncManager;
318 scope (exit) manager.stop(dur!"hnsecs"(0));
319
320 auto clients = makeAsyncClients(manager, ports);
321 scope(exit) foreach (c; clients) c.transport.close();
322
323 // Make sure the fastest client wins, even if they are called in some other
324 // order.
325 {
326 auto result = makeAsyncFastestPool(array(retro(clients))).getPort().waitGet();
327 enforce(result == ports[0]);
328 }
329
330 // Try the case where all clients fail.
331 {
332 auto pool = makeAsyncFastestPool(clients[3 .. $]);
333 auto e = cast(TCompoundOperationException)collectException(pool.getPort().waitGet());
334 enforce(e);
335 enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions),
336 ports[3 .. $]));
337 }
338
339 // Try the case where the first clients fail, but a later one succeeds.
340 {
341 auto pool = makeAsyncFastestPool(clients[1 .. $]);
342 enforce(pool.getPortInArray() == [ports[1]]);
343 }
344}
345
346auto makeAsyncFastestPool(TAsyncClientBase!ExTestService[] clients) {
347 auto p = tAsyncFastestClientPool(clients);
348 p.rpcFaultFilter = (Exception e) {
349 return (cast(TestServiceException)e !is null);
350 };
351 return p;
352}
353
354
355void asyncAggregatorTest(const(ushort)[] ports, ExTestHandler[] handlers) {
356 auto manager = new TLibeventAsyncManager;
357 scope (exit) manager.stop(dur!"hnsecs"(0));
358
359 auto clients = makeAsyncClients(manager, ports);
360 scope(exit) foreach (c; clients) c.transport.close();
361
362 auto aggregator = tAsyncAggregator(
363 cast(TAsyncClientBase!ExTestService[])clients);
364
365 // Test aggregator range interface.
366 {
367 auto range = aggregator.getPort().range(dur!"msecs"(50));
368 enforce(equal(range, ports[0 .. 2][]));
369 enforce(equal(map!"a.port"(cast(TestServiceException[])range.exceptions),
370 ports[3 .. $ - 1]));
371 enforce(range.completedCount == 4);
372 }
373
374 // Test default accumulator for scalars.
375 {
376 auto fullResult = aggregator.getPort().accumulate();
377 enforce(fullResult.waitGet() == ports[0 .. 3]);
378
379 auto partialResult = aggregator.getPort().accumulate();
380 Thread.sleep(dur!"msecs"(20));
381 enforce(partialResult.finishGet() == ports[0 .. 2]);
382
383 }
384
385 // Test default accumulator for arrays.
386 {
387 auto fullResult = aggregator.getPortInArray().accumulate();
388 enforce(fullResult.waitGet() == ports[0 .. 3]);
389
390 auto partialResult = aggregator.getPortInArray().accumulate();
391 Thread.sleep(dur!"msecs"(20));
392 enforce(partialResult.finishGet() == ports[0 .. 2]);
393 }
394
395 // Test custom accumulator.
396 {
397 auto fullResult = aggregator.getPort().accumulate!(function(int[] results){
398 return reduce!"a + b"(results);
399 })();
400 enforce(fullResult.waitGet() == ports[0] + ports[1] + ports[2]);
401
402 auto partialResult = aggregator.getPort().accumulate!(
403 function(int[] results, Exception[] exceptions) {
404 // Return a tuple of the parameters so we can check them outside of
405 // this function (to verify the values, we need access to »ports«, but
406 // due to DMD @@BUG5710@@, we can't use a delegate literal).f
407 return tuple(results, exceptions);
408 }
409 )();
410 Thread.sleep(dur!"msecs"(20));
411 auto resultTuple = partialResult.finishGet();
412 enforce(resultTuple._0 == ports[0 .. 2]);
413 enforce(equal(map!"a.port"(cast(TestServiceException[])resultTuple._1),
414 ports[3 .. $ - 1]));
415 }
416}