| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| module thrift.util.awaitable; |
| |
| import core.sync.condition; |
| import core.sync.mutex; |
| import core.time : Duration; |
| import std.exception : enforce; |
| import std.socket/+ : Socket, socketPair+/; // DMD @@BUG314@@ |
| import thrift.base; |
| |
| // To avoid DMD @@BUG6395@@. |
| import thrift.internal.algorithm; |
| |
| /** |
| * An event that can occur at some point in the future and which can be |
| * awaited, either by blocking until it occurs, or by registering a callback |
| * delegate. |
| */ |
| interface TAwaitable { |
| /** |
| * Waits until the event occurs. |
| * |
| * Calling wait() for an event that has already occurred is a no-op. |
| */ |
| void wait(); |
| |
| /** |
| * Waits until the event occurs or the specified timeout expires. |
| * |
| * Calling wait() for an event that has already occurred is a no-op. |
| * |
| * Returns: Whether the event was triggered before the timeout expired. |
| */ |
| bool wait(Duration timeout); |
| |
| /** |
| * Registers a callback that is called if the event occurs. |
| * |
| * The delegate will likely be invoked from a different thread, and is |
| * expected not to perform expensive work as it will usually be invoked |
| * synchronously by the notifying thread. The order in which registered |
| * callbacks are invoked is not specified. |
| * |
| * The callback must never throw, but nothrow semantics are difficult to |
| * enforce, so currently exceptions are just swallowed by |
| * TAwaitable implementations. |
| * |
| * If the event has already occurred, the delegate is immediately executed |
| * in the current thread. |
| */ |
| void addCallback(void delegate() dg); |
| |
| /** |
| * Removes a previously added callback. |
| * |
| * Returns: Whether the callback could be found in the list, i.e. whether it |
| * was previously added. |
| */ |
| bool removeCallback(void delegate() dg); |
| } |
| |
| /** |
| * A simple TAwaitable event triggered by just calling a trigger() method. |
| */ |
| class TOneshotEvent : TAwaitable { |
| this() { |
| mutex_ = new Mutex; |
| condition_ = new Condition(mutex_); |
| } |
| |
| override void wait() { |
| synchronized (mutex_) { |
| while (!triggered_) condition_.wait(); |
| } |
| } |
| |
| override bool wait(Duration timeout) { |
| synchronized (mutex_) { |
| if (triggered_) return true; |
| condition_.wait(timeout); |
| return triggered_; |
| } |
| } |
| |
| override void addCallback(void delegate() dg) { |
| mutex_.lock(); |
| scope (failure) mutex_.unlock(); |
| |
| callbacks_ ~= dg; |
| |
| if (triggered_) { |
| mutex_.unlock(); |
| dg(); |
| return; |
| } |
| |
| mutex_.unlock(); |
| } |
| |
| override bool removeCallback(void delegate() dg) { |
| synchronized (mutex_) { |
| auto oldLength = callbacks_.length; |
| callbacks_ = removeEqual(callbacks_, dg); |
| return callbacks_.length < oldLength; |
| } |
| } |
| |
| /** |
| * Triggers the event. |
| * |
| * Any registered event callbacks are executed synchronously before the |
| * function returns. |
| */ |
| void trigger() { |
| synchronized (mutex_) { |
| if (!triggered_) { |
| triggered_ = true; |
| condition_.notifyAll(); |
| foreach (c; callbacks_) c(); |
| } |
| } |
| } |
| |
| private: |
| bool triggered_; |
| Mutex mutex_; |
| Condition condition_; |
| void delegate()[] callbacks_; |
| } |
| |
| /** |
| * Translates TAwaitable events into dummy messages on a socket that can be |
| * used e.g. to wake up from a select() call. |
| */ |
| final class TSocketNotifier { |
| this() { |
| auto socks = socketPair(); |
| foreach (s; socks) s.blocking = false; |
| sendSocket_ = socks[0]; |
| recvSocket_ = socks[1]; |
| } |
| |
| /** |
| * The socket the messages will be sent to. |
| */ |
| Socket socket() @property { |
| return recvSocket_; |
| } |
| |
| /** |
| * Atatches the socket notifier to the specified awaitable, causing it to |
| * write a byte to the notification socket when the awaitable callbacks are |
| * invoked. |
| * |
| * If the event has already been triggered, the dummy byte is written |
| * immediately to the socket. |
| * |
| * A socket notifier can only be attached to a single awaitable at a time. |
| * |
| * Throws: TException if the socket notifier is already attached. |
| */ |
| void attach(TAwaitable awaitable) { |
| enforce(!awaitable_, new TException("Already attached.")); |
| awaitable.addCallback(¬ify); |
| awaitable_ = awaitable; |
| } |
| |
| /** |
| * Detaches the socket notifier from the awaitable it is currently attached |
| * to. |
| * |
| * Throws: TException if the socket notifier is not currently attached. |
| */ |
| void detach() { |
| enforce(awaitable_, new TException("Not attached.")); |
| |
| // Soak up any not currently read notification bytes. |
| ubyte[1] dummy = void; |
| while (recvSocket_.receive(dummy) != Socket.ERROR) {} |
| |
| auto couldRemove = awaitable_.removeCallback(¬ify); |
| assert(couldRemove); |
| awaitable_ = null; |
| } |
| |
| private: |
| void notify() { |
| ubyte[1] zero; |
| sendSocket_.send(zero); |
| } |
| |
| TAwaitable awaitable_; |
| Socket sendSocket_; |
| Socket recvSocket_; |
| } |