THRIFT-4666: Attempt to work around dlang client pool test failure
diff --git a/lib/d/test/client_pool_test.d b/lib/d/test/client_pool_test.d
index 52207d9..b24c97a 100644
--- a/lib/d/test/client_pool_test.d
+++ b/lib/d/test/client_pool_test.d
@@ -18,6 +18,7 @@
*/
module client_pool_test;
+import core.sync.semaphore : Semaphore;
import core.time : Duration, dur;
import core.thread : Thread;
import std.algorithm;
@@ -28,6 +29,7 @@
import std.range;
import std.stdio;
import std.typecons;
+import std.variant : Variant;
import thrift.base;
import thrift.async.libevent;
import thrift.async.socket;
@@ -37,9 +39,12 @@
import thrift.codegen.client;
import thrift.codegen.client_pool;
import thrift.codegen.processor;
+import thrift.protocol.base;
import thrift.protocol.binary;
+import thrift.server.base;
import thrift.server.simple;
import thrift.server.transport.socket;
+import thrift.transport.base;
import thrift.transport.buffered;
import thrift.transport.socket;
import thrift.util.cancellation;
@@ -108,11 +113,29 @@
}
}
+class ServerPreServeHandler : TServerEventHandler {
+ this(Semaphore sem) {
+ sem_ = sem;
+ }
+
+ override void preServe() {
+ sem_.notify();
+ }
+
+ Variant createContext(TProtocol input, TProtocol output) { return Variant.init; }
+ void deleteContext(Variant serverContext, TProtocol input, TProtocol output) {}
+ void preProcess(Variant serverContext, TTransport transport) {}
+
+private:
+ Semaphore sem_;
+}
+
class ServerThread : Thread {
- this(ExTestHandler handler, TCancellation cancellation) {
+ this(ExTestHandler handler, ServerPreServeHandler serverHandler, TCancellation cancellation) {
super(&run);
handler_ = handler;
cancellation_ = cancellation;
+ serverHandler_ = serverHandler;
}
private:
void run() {
@@ -123,16 +146,17 @@
serverTransport.recvTimeout = dur!"seconds"(3);
auto transportFactory = new TBufferedTransportFactory;
- auto server = new TSimpleServer(
- processor, serverTransport, transportFactory, protocolFactory);
+ auto server = new TSimpleServer(processor, serverTransport, transportFactory, protocolFactory);
+ server.eventHandler = serverHandler_;
server.serve(cancellation_);
} catch (Exception e) {
writefln("Server thread on port %s failed: %s", handler_.port, e);
}
}
- TCancellation cancellation_;
ExTestHandler handler_;
+ ServerPreServeHandler serverHandler_;
+ TCancellation cancellation_;
}
void main(string[] args) {
@@ -145,6 +169,9 @@
immutable ports = cast(immutable)array(map!"cast(ushort)a"(iota(port, port + 6)));
+ // semaphore that will be incremented whenever each server thread has bound and started listening
+ Semaphore sem = new Semaphore(0);
+
version (none) {
// Cannot use this due to multiple DMD @@BUG@@s:
// 1. »function D main is a nested function and cannot be accessed from array«
@@ -174,11 +201,10 @@
}
// Fire up the server threads.
- foreach (h; handlers) (new ServerThread(h, serverCancellation)).start();
+ foreach (h; handlers) (new ServerThread(h, new ServerPreServeHandler(sem), serverCancellation)).start();
- // Give the servers some time to get up. This should really be accomplished
- // via a barrier here and in the preServe() hook.
- Thread.sleep(dur!"msecs"(10));
+ // wait until all the handlers signal that they're ready to serve
+ foreach (h; handlers) (sem.wait(dur!`seconds`(1)));
syncClientPoolTest(ports, handlers);
asyncClientPoolTest(ports, handlers);