| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| /** |
| * Defines the interface used for client-side handling of asynchronous |
| * I/O operations, based on coroutines. |
| * |
| * The main piece of the »client side« (e.g. for TAsyncClient users) of the |
| * API is TFuture, which represents an asynchronously executed operation, |
| * which can have a return value, throw exceptions, and which can be waited |
| * upon. |
| * |
| * On the »implementation side«, the idea is that by using a TAsyncTransport |
| * instead of a normal TTransport and executing the work through a |
| * TAsyncManager, the same code as for synchronous I/O can be used for |
| * asynchronous operation as well, for example: |
| * |
| * --- |
| * auto socket = new TAsyncSocket(someTAsyncSocketManager(), host, port); |
| * // … |
| * socket.asyncManager.execute(socket, { |
| * SomeThriftStruct s; |
| * |
| * // Waiting for socket I/O will not block an entire thread but cause |
| * // the async manager to execute another task in the meantime, because |
| * // we are using TAsyncSocket instead of TSocket. |
| * s.read(socket); |
| * |
| * // Do something with s, e.g. set a TPromise result to it. |
| * writeln(s); |
| * }); |
| * --- |
| */ |
| module thrift.async.base; |
| |
| import core.time : Duration, dur; |
| import std.socket/+ : Socket+/; // DMD @@BUG314@@ |
| import thrift.base; |
| import thrift.transport.base; |
| import thrift.util.cancellation; |
| |
| /** |
| * Manages one or more asynchronous transport resources (e.g. sockets in the |
| * case of TAsyncSocketManager) and allows work items to be submitted for them. |
| * |
| * Implementations will typically run one or more background threads for |
| * executing the work, which is one of the reasons for a TAsyncManager to be |
| * used. Each work item is run in its own fiber and is expected to yield() away |
| * while waiting for time-consuming operations. |
| * |
| * The second important purpose of TAsyncManager is to serialize access to |
| * the transport resources – without taking care of that, e.g. issuing multiple |
| * RPC calls over the same connection in rapid succession would likely lead to |
| * more than one request being written at the same time, causing only garbage |
| * to arrive at the remote end. |
| * |
| * All methods are thread-safe. |
| */ |
| interface TAsyncManager { |
| /** |
| * Submits a work item to be executed asynchronously. |
| * |
| * Access to asnyc transports is serialized – if two work items associated |
| * with the same transport are submitted, the second delegate will not be |
| * invoked until the first has returned, even it the latter context-switches |
| * away (because it is waiting for I/O) and the async manager is idle |
| * otherwise. |
| * |
| * Optionally, a TCancellation instance can be specified. If present, |
| * triggering it will be considered a request to cancel the work item, if it |
| * is still waiting for the associated transport to become available. |
| * Delegates which are already being processed (i.e. waiting for I/O) are not |
| * affected because this would bring the connection into an undefined state |
| * (as probably half-written request or a half-read response would be left |
| * behind). |
| * |
| * Params: |
| * transport = The TAsyncTransport the work delegate will operate on. Must |
| * be associated with this TAsyncManager instance. |
| * work = The operations to execute on the given transport. Must never |
| * throw, errors should be handled in another way. nothrow semantics are |
| * difficult to enforce in combination with fibres though, so currently |
| * exceptions are just swallowed by TAsyncManager implementations. |
| * cancellation = If set, can be used to request cancellatinon of this work |
| * item if it is still waiting to be executed. |
| * |
| * Note: The work item will likely be executed in a different thread, so make |
| * sure the code it relies on is thread-safe. An exception are the async |
| * transports themselves, to which access is serialized as noted above. |
| */ |
| void execute(TAsyncTransport transport, void delegate() work, |
| TCancellation cancellation = null |
| ) in { |
| assert(transport.asyncManager is this, |
| "The given transport must be associated with this TAsyncManager."); |
| } |
| |
| /** |
| * Submits a delegate to be executed after a certain amount of time has |
| * passed. |
| * |
| * The actual amount of time elapsed can be higher if the async manager |
| * instance is busy and thus should not be relied on. The |
| * |
| * Params: |
| * duration = The amount of time to wait before starting to execute the |
| * work delegate. |
| * work = The code to execute after the specified amount of time has passed. |
| * |
| * Example: |
| * --- |
| * // A very basic example – usually, the actuall work item would enqueue |
| * // some async transport operation. |
| * auto asyncMangager = someAsyncManager(); |
| * |
| * TFuture!int calculate() { |
| * // Create a promise and asynchronously set its value after three |
| * // seconds have passed. |
| * auto promise = new TPromise!int; |
| * asyncManager.delay(dur!"seconds"(3), { |
| * promise.succeed(42); |
| * }); |
| * |
| * // Immediately return it to the caller. |
| * return promise; |
| * } |
| * |
| * // This will wait until the result is available and then print it. |
| * writeln(calculate().waitGet()); |
| * --- |
| */ |
| void delay(Duration duration, void delegate() work); |
| |
| /** |
| * Shuts down all background threads or other facilities that might have |
| * been started in order to execute work items. This function is typically |
| * called during program shutdown. |
| * |
| * If there are still tasks to be executed when the timeout expires, any |
| * currently executed work items will never receive any notifications |
| * for async transports managed by this instance, queued work items will |
| * be silently dropped, and implementations are allowed to leak resources. |
| * |
| * Params: |
| * waitFinishTimeout = If positive, waits for all work items to be |
| * finished for the specified amount of time, if negative, waits for |
| * completion without ever timing out, if zero, immediately shuts down |
| * the background facilities. |
| */ |
| bool stop(Duration waitFinishTimeout = dur!"hnsecs"(-1)); |
| } |
| |
| /** |
| * A TTransport which uses a TAsyncManager to schedule non-blocking operations. |
| * |
| * The actual type of device is not specified; typically, implementations will |
| * depend on an interface derived from TAsyncManager to be notified of changes |
| * in the transport state. |
| * |
| * The peeking, reading, writing and flushing methods must always be called |
| * from within the associated async manager. |
| */ |
| interface TAsyncTransport : TTransport { |
| /** |
| * The TAsyncManager associated with this transport. |
| */ |
| TAsyncManager asyncManager() @property; |
| } |
| |
| /** |
| * A TAsyncManager providing notificiations for socket events. |
| */ |
| interface TAsyncSocketManager : TAsyncManager { |
| /** |
| * Adds a listener that is triggered once when an event of the specified type |
| * occurs, and removed afterwards. |
| * |
| * Params: |
| * socket = The socket to listen for events at. |
| * eventType = The type of the event to listen for. |
| * timeout = The period of time after which the listener will be called |
| * with TAsyncEventReason.TIMED_OUT if no event happened. |
| * listener = The delegate to call when an event happened. |
| */ |
| void addOneshotListener(Socket socket, TAsyncEventType eventType, |
| Duration timeout, TSocketEventListener listener); |
| |
| /// Ditto |
| void addOneshotListener(Socket socket, TAsyncEventType eventType, |
| TSocketEventListener listener); |
| } |
| |
| /** |
| * Types of events that can happen for an asynchronous transport. |
| */ |
| enum TAsyncEventType { |
| READ, /// New data became available to read. |
| WRITE /// The transport became ready to be written to. |
| } |
| |
| /** |
| * The type of the delegates used to register socket event handlers. |
| */ |
| alias void delegate(TAsyncEventReason callReason) TSocketEventListener; |
| |
| /** |
| * The reason a listener was called. |
| */ |
| enum TAsyncEventReason : byte { |
| NORMAL, /// The event listened for was triggered normally. |
| TIMED_OUT /// A timeout for the event was set, and it expired. |
| } |