Jake Farrell | b95b0ff | 2012-03-22 21:49:10 +0000 | [diff] [blame] | 1 | /* |
| 2 | * Licensed to the Apache Software Foundation (ASF) under one |
| 3 | * or more contributor license agreements. See the NOTICE file |
| 4 | * distributed with this work for additional information |
| 5 | * regarding copyright ownership. The ASF licenses this file |
| 6 | * to you under the Apache License, Version 2.0 (the |
| 7 | * "License"); you may not use this file except in compliance |
| 8 | * with the License. You may obtain a copy of the License at |
| 9 | * |
| 10 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | * |
| 12 | * Unless required by applicable law or agreed to in writing, |
| 13 | * software distributed under the License is distributed on an |
| 14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 15 | * KIND, either express or implied. See the License for the |
| 16 | * specific language governing permissions and limitations |
| 17 | * under the License. |
| 18 | */ |
| 19 | |
| 20 | /** |
| 21 | * Defines the interface used for client-side handling of asynchronous |
| 22 | * I/O operations, based on coroutines. |
| 23 | * |
| 24 | * The main piece of the »client side« (e.g. for TAsyncClient users) of the |
| 25 | * API is TFuture, which represents an asynchronously executed operation, |
| 26 | * which can have a return value, throw exceptions, and which can be waited |
| 27 | * upon. |
| 28 | * |
| 29 | * On the »implementation side«, the idea is that by using a TAsyncTransport |
| 30 | * instead of a normal TTransport and executing the work through a |
| 31 | * TAsyncManager, the same code as for synchronous I/O can be used for |
| 32 | * asynchronous operation as well, for example: |
| 33 | * |
| 34 | * --- |
| 35 | * auto socket = new TAsyncSocket(someTAsyncSocketManager(), host, port); |
| 36 | * // … |
| 37 | * socket.asyncManager.execute(socket, { |
| 38 | * SomeThriftStruct s; |
| 39 | * |
| 40 | * // Waiting for socket I/O will not block an entire thread but cause |
| 41 | * // the async manager to execute another task in the meantime, because |
| 42 | * // we are using TAsyncSocket instead of TSocket. |
| 43 | * s.read(socket); |
| 44 | * |
| 45 | * // Do something with s, e.g. set a TPromise result to it. |
| 46 | * writeln(s); |
| 47 | * }); |
| 48 | * --- |
| 49 | */ |
| 50 | module thrift.async.base; |
| 51 | |
| 52 | import core.time : Duration, dur; |
| 53 | import std.socket/+ : Socket+/; // DMD @@BUG314@@ |
| 54 | import thrift.base; |
| 55 | import thrift.transport.base; |
| 56 | import thrift.util.cancellation; |
| 57 | |
| 58 | /** |
| 59 | * Manages one or more asynchronous transport resources (e.g. sockets in the |
| 60 | * case of TAsyncSocketManager) and allows work items to be submitted for them. |
| 61 | * |
| 62 | * Implementations will typically run one or more background threads for |
| 63 | * executing the work, which is one of the reasons for a TAsyncManager to be |
| 64 | * used. Each work item is run in its own fiber and is expected to yield() away |
| 65 | * while waiting for time-consuming operations. |
| 66 | * |
| 67 | * The second important purpose of TAsyncManager is to serialize access to |
| 68 | * the transport resources – without taking care of that, e.g. issuing multiple |
| 69 | * RPC calls over the same connection in rapid succession would likely lead to |
| 70 | * more than one request being written at the same time, causing only garbage |
| 71 | * to arrive at the remote end. |
| 72 | * |
| 73 | * All methods are thread-safe. |
| 74 | */ |
| 75 | interface TAsyncManager { |
| 76 | /** |
| 77 | * Submits a work item to be executed asynchronously. |
| 78 | * |
| 79 | * Access to asnyc transports is serialized – if two work items associated |
| 80 | * with the same transport are submitted, the second delegate will not be |
| 81 | * invoked until the first has returned, even it the latter context-switches |
| 82 | * away (because it is waiting for I/O) and the async manager is idle |
| 83 | * otherwise. |
| 84 | * |
| 85 | * Optionally, a TCancellation instance can be specified. If present, |
| 86 | * triggering it will be considered a request to cancel the work item, if it |
| 87 | * is still waiting for the associated transport to become available. |
| 88 | * Delegates which are already being processed (i.e. waiting for I/O) are not |
| 89 | * affected because this would bring the connection into an undefined state |
| 90 | * (as probably half-written request or a half-read response would be left |
| 91 | * behind). |
| 92 | * |
| 93 | * Params: |
| 94 | * transport = The TAsyncTransport the work delegate will operate on. Must |
| 95 | * be associated with this TAsyncManager instance. |
| 96 | * work = The operations to execute on the given transport. Must never |
| 97 | * throw, errors should be handled in another way. nothrow semantics are |
| 98 | * difficult to enforce in combination with fibres though, so currently |
| 99 | * exceptions are just swallowed by TAsyncManager implementations. |
| 100 | * cancellation = If set, can be used to request cancellatinon of this work |
| 101 | * item if it is still waiting to be executed. |
| 102 | * |
| 103 | * Note: The work item will likely be executed in a different thread, so make |
| 104 | * sure the code it relies on is thread-safe. An exception are the async |
| 105 | * transports themselves, to which access is serialized as noted above. |
| 106 | */ |
| 107 | void execute(TAsyncTransport transport, void delegate() work, |
| 108 | TCancellation cancellation = null |
| 109 | ) in { |
| 110 | assert(transport.asyncManager is this, |
| 111 | "The given transport must be associated with this TAsyncManager."); |
| 112 | } |
| 113 | |
| 114 | /** |
| 115 | * Submits a delegate to be executed after a certain amount of time has |
| 116 | * passed. |
| 117 | * |
| 118 | * The actual amount of time elapsed can be higher if the async manager |
| 119 | * instance is busy and thus should not be relied on. The |
| 120 | * |
| 121 | * Params: |
| 122 | * duration = The amount of time to wait before starting to execute the |
| 123 | * work delegate. |
| 124 | * work = The code to execute after the specified amount of time has passed. |
| 125 | * |
| 126 | * Example: |
| 127 | * --- |
| 128 | * // A very basic example – usually, the actuall work item would enqueue |
| 129 | * // some async transport operation. |
| 130 | * auto asyncMangager = someAsyncManager(); |
| 131 | * |
| 132 | * TFuture!int calculate() { |
| 133 | * // Create a promise and asynchronously set its value after three |
| 134 | * // seconds have passed. |
| 135 | * auto promise = new TPromise!int; |
| 136 | * asyncManager.delay(dur!"seconds"(3), { |
| 137 | * promise.succeed(42); |
| 138 | * }); |
| 139 | * |
| 140 | * // Immediately return it to the caller. |
| 141 | * return promise; |
| 142 | * } |
| 143 | * |
| 144 | * // This will wait until the result is available and then print it. |
| 145 | * writeln(calculate().waitGet()); |
| 146 | * --- |
| 147 | */ |
| 148 | void delay(Duration duration, void delegate() work); |
| 149 | |
| 150 | /** |
| 151 | * Shuts down all background threads or other facilities that might have |
| 152 | * been started in order to execute work items. This function is typically |
| 153 | * called during program shutdown. |
| 154 | * |
| 155 | * If there are still tasks to be executed when the timeout expires, any |
| 156 | * currently executed work items will never receive any notifications |
| 157 | * for async transports managed by this instance, queued work items will |
| 158 | * be silently dropped, and implementations are allowed to leak resources. |
| 159 | * |
| 160 | * Params: |
| 161 | * waitFinishTimeout = If positive, waits for all work items to be |
| 162 | * finished for the specified amount of time, if negative, waits for |
| 163 | * completion without ever timing out, if zero, immediately shuts down |
| 164 | * the background facilities. |
| 165 | */ |
| 166 | bool stop(Duration waitFinishTimeout = dur!"hnsecs"(-1)); |
| 167 | } |
| 168 | |
| 169 | /** |
| 170 | * A TTransport which uses a TAsyncManager to schedule non-blocking operations. |
| 171 | * |
| 172 | * The actual type of device is not specified; typically, implementations will |
| 173 | * depend on an interface derived from TAsyncManager to be notified of changes |
| 174 | * in the transport state. |
| 175 | * |
| 176 | * The peeking, reading, writing and flushing methods must always be called |
| 177 | * from within the associated async manager. |
| 178 | */ |
| 179 | interface TAsyncTransport : TTransport { |
| 180 | /** |
| 181 | * The TAsyncManager associated with this transport. |
| 182 | */ |
| 183 | TAsyncManager asyncManager() @property; |
| 184 | } |
| 185 | |
| 186 | /** |
| 187 | * A TAsyncManager providing notificiations for socket events. |
| 188 | */ |
| 189 | interface TAsyncSocketManager : TAsyncManager { |
| 190 | /** |
| 191 | * Adds a listener that is triggered once when an event of the specified type |
| 192 | * occurs, and removed afterwards. |
| 193 | * |
| 194 | * Params: |
| 195 | * socket = The socket to listen for events at. |
| 196 | * eventType = The type of the event to listen for. |
| 197 | * timeout = The period of time after which the listener will be called |
| 198 | * with TAsyncEventReason.TIMED_OUT if no event happened. |
| 199 | * listener = The delegate to call when an event happened. |
| 200 | */ |
| 201 | void addOneshotListener(Socket socket, TAsyncEventType eventType, |
| 202 | Duration timeout, TSocketEventListener listener); |
| 203 | |
| 204 | /// Ditto |
| 205 | void addOneshotListener(Socket socket, TAsyncEventType eventType, |
| 206 | TSocketEventListener listener); |
| 207 | } |
| 208 | |
| 209 | /** |
| 210 | * Types of events that can happen for an asynchronous transport. |
| 211 | */ |
| 212 | enum TAsyncEventType { |
| 213 | READ, /// New data became available to read. |
| 214 | WRITE /// The transport became ready to be written to. |
| 215 | } |
| 216 | |
| 217 | /** |
| 218 | * The type of the delegates used to register socket event handlers. |
| 219 | */ |
| 220 | alias void delegate(TAsyncEventReason callReason) TSocketEventListener; |
| 221 | |
| 222 | /** |
| 223 | * The reason a listener was called. |
| 224 | */ |
| 225 | enum TAsyncEventReason : byte { |
| 226 | NORMAL, /// The event listened for was triggered normally. |
| 227 | TIMED_OUT /// A timeout for the event was set, and it expired. |
| 228 | } |