Jake Farrell | b95b0ff | 2012-03-22 21:49:10 +0000 | [diff] [blame] | 1 | /* |
| 2 | * Licensed to the Apache Software Foundation (ASF) under one |
| 3 | * or more contributor license agreements. See the NOTICE file |
| 4 | * distributed with this work for additional information |
| 5 | * regarding copyright ownership. The ASF licenses this file |
| 6 | * to you under the Apache License, Version 2.0 (the |
| 7 | * "License"); you may not use this file except in compliance |
| 8 | * with the License. You may obtain a copy of the License at |
| 9 | * |
| 10 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | * |
| 12 | * Unless required by applicable law or agreed to in writing, |
| 13 | * software distributed under the License is distributed on an |
| 14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 15 | * KIND, either express or implied. See the License for the |
| 16 | * specific language governing permissions and limitations |
| 17 | * under the License. |
| 18 | */ |
| 19 | module thrift.util.future; |
| 20 | |
| 21 | import core.atomic; |
| 22 | import core.sync.condition; |
| 23 | import core.sync.mutex; |
| 24 | import core.time : Duration; |
| 25 | import std.array : empty, front, popFront; |
| 26 | import std.conv : to; |
| 27 | import std.exception : enforce; |
| 28 | import std.traits : BaseTypeTuple, isSomeFunction, ParameterTypeTuple, ReturnType; |
| 29 | import thrift.base; |
| 30 | import thrift.util.awaitable; |
| 31 | import thrift.util.cancellation; |
| 32 | |
| 33 | /** |
| 34 | * Represents an operation which is executed asynchronously and the result of |
| 35 | * which will become available at some point in the future. |
| 36 | * |
| 37 | * Once a operation is completed, the result of the operation can be fetched |
| 38 | * via the get() family of methods. There are three possible cases: Either the |
| 39 | * operation succeeded, then its return value is returned, or it failed by |
| 40 | * throwing, in which case the exception is rethrown, or it was cancelled |
| 41 | * before, then a TCancelledException is thrown. There might be TFuture |
| 42 | * implementations which never possibly enter the cancelled state. |
| 43 | * |
| 44 | * All methods are thread-safe, but keep in mind that any exception object or |
| 45 | * result (if it is a reference type, of course) is shared between all |
| 46 | * get()-family invocations. |
| 47 | */ |
| 48 | interface TFuture(ResultType) { |
| 49 | /** |
| 50 | * The status the operation is currently in. |
| 51 | * |
| 52 | * An operation starts out in RUNNING status, and changes state to one of the |
| 53 | * others at most once afterwards. |
| 54 | */ |
| 55 | TFutureStatus status() @property; |
| 56 | |
| 57 | /** |
| 58 | * A TAwaitable triggered when the operation leaves the RUNNING status. |
| 59 | */ |
| 60 | TAwaitable completion() @property; |
| 61 | |
| 62 | /** |
| 63 | * Convenience shorthand for waiting until the result is available and then |
| 64 | * get()ing it. |
| 65 | * |
| 66 | * If the operation has already completed, the result is immediately |
| 67 | * returned. |
| 68 | * |
| 69 | * The result of this method is »alias this«'d to the interface, so that |
| 70 | * TFuture can be used as a drop-in replacement for a simple value in |
| 71 | * synchronous code. |
| 72 | */ |
| 73 | final ResultType waitGet() { |
| 74 | completion.wait(); |
| 75 | return get(); |
| 76 | } |
| 77 | final @property auto waitGetProperty() { return waitGet(); } |
| 78 | alias waitGetProperty this; |
| 79 | |
| 80 | /** |
| 81 | * Convenience shorthand for waiting until the result is available and then |
| 82 | * get()ing it. |
| 83 | * |
| 84 | * If the operation completes in time, returns its result (resp. throws an |
| 85 | * exception for the failed/cancelled cases). If not, throws a |
| 86 | * TFutureException. |
| 87 | */ |
| 88 | final ResultType waitGet(Duration timeout) { |
| 89 | enforce(completion.wait(timeout), new TFutureException( |
| 90 | "Operation did not complete in time.")); |
| 91 | return get(); |
| 92 | } |
| 93 | |
| 94 | /** |
| 95 | * Returns the result of the operation. |
| 96 | * |
| 97 | * Throws: TFutureException if the operation has been cancelled, |
| 98 | * TCancelledException if it is not yet done; the set exception if it |
| 99 | * failed. |
| 100 | */ |
| 101 | ResultType get(); |
| 102 | |
| 103 | /** |
| 104 | * Returns the captured exception if the operation failed, or null otherwise. |
| 105 | * |
| 106 | * Throws: TFutureException if not yet done, TCancelledException if the |
| 107 | * operation has been cancelled. |
| 108 | */ |
| 109 | Exception getException(); |
| 110 | } |
| 111 | |
| 112 | /** |
| 113 | * The states the operation offering a future interface can be in. |
| 114 | */ |
| 115 | enum TFutureStatus : byte { |
| 116 | RUNNING, /// The operation is still running. |
| 117 | SUCCEEDED, /// The operation completed without throwing an exception. |
| 118 | FAILED, /// The operation completed by throwing an exception. |
| 119 | CANCELLED /// The operation was cancelled. |
| 120 | } |
| 121 | |
| 122 | /** |
| 123 | * A TFuture covering the simple but common case where the result is simply |
| 124 | * set by a call to succeed()/fail(). |
| 125 | * |
| 126 | * All methods are thread-safe, but usually, succeed()/fail() are only called |
| 127 | * from a single thread (different from the thread(s) waiting for the result |
| 128 | * using the TFuture interface, though). |
| 129 | */ |
| 130 | class TPromise(ResultType) : TFuture!ResultType { |
| 131 | this() { |
| 132 | statusMutex_ = new Mutex; |
| 133 | completionEvent_ = new TOneshotEvent; |
| 134 | } |
| 135 | |
| 136 | override S status() const @property { |
| 137 | return atomicLoad(status_); |
| 138 | } |
| 139 | |
| 140 | override TAwaitable completion() @property { |
| 141 | return completionEvent_; |
| 142 | } |
| 143 | |
| 144 | override ResultType get() { |
| 145 | auto s = atomicLoad(status_); |
| 146 | enforce(s != S.RUNNING, |
| 147 | new TFutureException("Operation not yet completed.")); |
| 148 | |
| 149 | if (s == S.CANCELLED) throw new TCancelledException; |
| 150 | if (s == S.FAILED) throw exception_; |
| 151 | |
| 152 | static if (!is(ResultType == void)) { |
| 153 | return result_; |
| 154 | } |
| 155 | } |
| 156 | |
| 157 | override Exception getException() { |
| 158 | auto s = atomicLoad(status_); |
| 159 | enforce(s != S.RUNNING, |
| 160 | new TFutureException("Operation not yet completed.")); |
| 161 | |
| 162 | if (s == S.CANCELLED) throw new TCancelledException; |
| 163 | if (s == S.SUCCEEDED) return null; |
| 164 | |
| 165 | return exception_; |
| 166 | } |
| 167 | |
| 168 | static if (!is(ResultType == void)) { |
| 169 | /** |
| 170 | * Sets the result of the operation, marks it as done, and notifies any |
| 171 | * waiters. |
| 172 | * |
| 173 | * If the operation has been cancelled before, nothing happens. |
| 174 | * |
| 175 | * Throws: TFutureException if the operation is already completed. |
| 176 | */ |
| 177 | void succeed(ResultType result) { |
| 178 | synchronized (statusMutex_) { |
| 179 | auto s = atomicLoad(status_); |
| 180 | if (s == S.CANCELLED) return; |
| 181 | |
| 182 | enforce(s == S.RUNNING, |
| 183 | new TFutureException("Operation already completed.")); |
| 184 | result_ = result; |
| 185 | |
| 186 | atomicStore(status_, S.SUCCEEDED); |
| 187 | } |
| 188 | |
| 189 | completionEvent_.trigger(); |
| 190 | } |
| 191 | } else { |
| 192 | void succeed() { |
| 193 | synchronized (statusMutex_) { |
| 194 | auto s = atomicLoad(status_); |
| 195 | if (s == S.CANCELLED) return; |
| 196 | |
| 197 | enforce(s == S.RUNNING, |
| 198 | new TFutureException("Operation already completed.")); |
| 199 | |
| 200 | atomicStore(status_, S.SUCCEEDED); |
| 201 | } |
| 202 | |
| 203 | completionEvent_.trigger(); |
| 204 | } |
| 205 | } |
| 206 | |
| 207 | /** |
| 208 | * Marks the operation as failed with the specified exception and notifies |
| 209 | * any waiters. |
| 210 | * |
| 211 | * If the operation was already cancelled, nothing happens. |
| 212 | * |
| 213 | * Throws: TFutureException if the operation is already completed. |
| 214 | */ |
| 215 | void fail(Exception exception) { |
| 216 | synchronized (statusMutex_) { |
| 217 | auto status = atomicLoad(status_); |
| 218 | if (status == S.CANCELLED) return; |
| 219 | |
| 220 | enforce(status == S.RUNNING, |
| 221 | new TFutureException("Operation already completed.")); |
| 222 | exception_ = exception; |
| 223 | |
| 224 | atomicStore(status_, S.FAILED); |
| 225 | } |
| 226 | |
| 227 | completionEvent_.trigger(); |
| 228 | } |
| 229 | |
| 230 | |
| 231 | /** |
| 232 | * Marks this operation as completed and takes over the outcome of another |
| 233 | * TFuture of the same type. |
| 234 | * |
| 235 | * If this operation was already cancelled, nothing happens. If the other |
| 236 | * operation was cancelled, this operation is marked as failed with a |
| 237 | * TCancelledException. |
| 238 | * |
| 239 | * Throws: TFutureException if the passed in future was not completed or |
| 240 | * this operation is already completed. |
| 241 | */ |
| 242 | void complete(TFuture!ResultType future) { |
| 243 | synchronized (statusMutex_) { |
| 244 | auto status = atomicLoad(status_); |
| 245 | if (status == S.CANCELLED) return; |
| 246 | enforce(status == S.RUNNING, |
| 247 | new TFutureException("Operation already completed.")); |
| 248 | |
| 249 | enforce(future.status != S.RUNNING, new TFutureException( |
| 250 | "The passed TFuture is not yet completed.")); |
| 251 | |
| 252 | status = future.status; |
| 253 | if (status == S.CANCELLED) { |
| 254 | status = S.FAILED; |
| 255 | exception_ = new TCancelledException; |
| 256 | } else if (status == S.FAILED) { |
| 257 | exception_ = future.getException(); |
| 258 | } else static if (!is(ResultType == void)) { |
| 259 | result_ = future.get(); |
| 260 | } |
| 261 | |
| 262 | atomicStore(status_, status); |
| 263 | } |
| 264 | |
| 265 | completionEvent_.trigger(); |
| 266 | } |
| 267 | |
| 268 | /** |
| 269 | * Marks this operation as cancelled and notifies any waiters. |
| 270 | * |
| 271 | * If the operation is already completed, nothing happens. |
| 272 | */ |
| 273 | void cancel() { |
| 274 | synchronized (statusMutex_) { |
| 275 | auto status = atomicLoad(status_); |
| 276 | if (status == S.RUNNING) atomicStore(status_, S.CANCELLED); |
| 277 | } |
| 278 | |
| 279 | completionEvent_.trigger(); |
| 280 | } |
| 281 | |
| 282 | private: |
| 283 | // Convenience alias because TFutureStatus is ubiquitous in this class. |
| 284 | alias TFutureStatus S; |
| 285 | |
| 286 | // The status the promise is currently in. |
| 287 | shared S status_; |
| 288 | |
| 289 | union { |
| 290 | static if (!is(ResultType == void)) { |
| 291 | // Set if status_ is SUCCEEDED. |
| 292 | ResultType result_; |
| 293 | } |
| 294 | // Set if status_ is FAILED. |
| 295 | Exception exception_; |
| 296 | } |
| 297 | |
| 298 | // Protects status_. |
| 299 | // As for result_ and exception_: They are only set once, while status_ is |
| 300 | // still RUNNING, so given that the operation has already completed, reading |
| 301 | // them is safe without holding some kind of lock. |
| 302 | Mutex statusMutex_; |
| 303 | |
| 304 | // Triggered when the event completes. |
| 305 | TOneshotEvent completionEvent_; |
| 306 | } |
| 307 | |
| 308 | /// |
| 309 | class TFutureException : TException { |
| 310 | /// |
| 311 | this(string msg = "", string file = __FILE__, size_t line = __LINE__, |
| 312 | Throwable next = null) |
| 313 | { |
| 314 | super(msg, file, line, next); |
| 315 | } |
| 316 | } |
| 317 | |
| 318 | /** |
Konrad Grochowski | 3b5dacb | 2014-11-24 10:55:31 +0100 | [diff] [blame] | 319 | * Creates an interface that is similar to a given one, but accepts an |
Jake Farrell | b95b0ff | 2012-03-22 21:49:10 +0000 | [diff] [blame] | 320 | * additional, optional TCancellation parameter each method, and returns |
| 321 | * TFutures instead of plain return values. |
| 322 | * |
| 323 | * For example, given the following declarations: |
| 324 | * --- |
| 325 | * interface Foo { |
| 326 | * void bar(); |
| 327 | * string baz(int a); |
| 328 | * } |
| 329 | * alias TFutureInterface!Foo FutureFoo; |
| 330 | * --- |
| 331 | * |
| 332 | * FutureFoo would be equivalent to: |
| 333 | * --- |
| 334 | * interface FutureFoo { |
| 335 | * TFuture!void bar(TCancellation cancellation = null); |
| 336 | * TFuture!string baz(int a, TCancellation cancellation = null); |
| 337 | * } |
| 338 | * --- |
| 339 | */ |
| 340 | template TFutureInterface(Interface) if (is(Interface _ == interface)) { |
| 341 | mixin({ |
| 342 | string code = "interface TFutureInterface \n"; |
| 343 | |
| 344 | static if (is(Interface Bases == super) && Bases.length > 0) { |
| 345 | code ~= ": "; |
| 346 | foreach (i; 0 .. Bases.length) { |
| 347 | if (i > 0) code ~= ", "; |
| 348 | code ~= "TFutureInterface!(BaseTypeTuple!Interface[" ~ to!string(i) ~ "]) "; |
| 349 | } |
| 350 | } |
| 351 | |
| 352 | code ~= "{\n"; |
| 353 | |
| 354 | foreach (methodName; __traits(derivedMembers, Interface)) { |
| 355 | enum qn = "Interface." ~ methodName; |
| 356 | static if (isSomeFunction!(mixin(qn))) { |
| 357 | code ~= "TFuture!(ReturnType!(" ~ qn ~ ")) " ~ methodName ~ |
| 358 | "(ParameterTypeTuple!(" ~ qn ~ "), TCancellation cancellation = null);\n"; |
| 359 | } |
| 360 | } |
| 361 | |
| 362 | code ~= "}\n"; |
| 363 | return code; |
| 364 | }()); |
| 365 | } |
| 366 | |
| 367 | /** |
| 368 | * An input range that aggregates results from multiple asynchronous operations, |
| 369 | * returning them in the order they arrive. |
| 370 | * |
| 371 | * Additionally, a timeout can be set after which results from not yet finished |
| 372 | * futures will no longer be waited for, e.g. to ensure the time it takes to |
| 373 | * iterate over a set of results is limited. |
| 374 | */ |
| 375 | final class TFutureAggregatorRange(T) { |
| 376 | /** |
| 377 | * Constructs a new instance. |
| 378 | * |
| 379 | * Params: |
| 380 | * futures = The set of futures to collect results from. |
| 381 | * timeout = If positive, not yet finished futures will be cancelled and |
| 382 | * their results will not be taken into account. |
| 383 | */ |
| 384 | this(TFuture!T[] futures, TCancellationOrigin childCancellation, |
| 385 | Duration timeout = dur!"hnsecs"(0) |
| 386 | ) { |
| 387 | if (timeout > dur!"hnsecs"(0)) { |
| 388 | timeoutSysTick_ = TickDuration.currSystemTick + |
| 389 | TickDuration.from!"hnsecs"(timeout.total!"hnsecs"); |
| 390 | } else { |
| 391 | timeoutSysTick_ = TickDuration(0); |
| 392 | } |
| 393 | |
| 394 | queueMutex_ = new Mutex; |
| 395 | queueNonEmptyCondition_ = new Condition(queueMutex_); |
| 396 | futures_ = futures; |
| 397 | childCancellation_ = childCancellation; |
| 398 | |
| 399 | foreach (future; futures_) { |
| 400 | future.completion.addCallback({ |
| 401 | auto f = future; |
| 402 | return { |
| 403 | if (f.status == TFutureStatus.CANCELLED) return; |
| 404 | assert(f.status != TFutureStatus.RUNNING); |
| 405 | |
| 406 | synchronized (queueMutex_) { |
| 407 | completedQueue_ ~= f; |
| 408 | |
| 409 | if (completedQueue_.length == 1) { |
| 410 | queueNonEmptyCondition_.notifyAll(); |
| 411 | } |
| 412 | } |
| 413 | }; |
| 414 | }()); |
| 415 | } |
| 416 | } |
| 417 | |
| 418 | /** |
| 419 | * Whether the range is empty. |
| 420 | * |
| 421 | * This is the case if the results from the completed futures not having |
| 422 | * failed have already been popped and either all future have been finished |
| 423 | * or the timeout has expired. |
| 424 | * |
| 425 | * Potentially blocks until a new result is available or the timeout has |
| 426 | * expired. |
| 427 | */ |
| 428 | bool empty() @property { |
| 429 | if (finished_) return true; |
| 430 | if (bufferFilled_) return false; |
| 431 | |
| 432 | while (true) { |
| 433 | TFuture!T future; |
| 434 | synchronized (queueMutex_) { |
| 435 | // The while loop is just being cautious about spurious wakeups, in |
| 436 | // case they should be possible. |
| 437 | while (completedQueue_.empty) { |
| 438 | auto remaining = to!Duration(timeoutSysTick_ - |
| 439 | TickDuration.currSystemTick); |
| 440 | |
| 441 | if (remaining <= dur!"hnsecs"(0)) { |
| 442 | // No time left, but still no element received – we are empty now. |
| 443 | finished_ = true; |
| 444 | childCancellation_.trigger(); |
| 445 | return true; |
| 446 | } |
| 447 | |
| 448 | queueNonEmptyCondition_.wait(remaining); |
| 449 | } |
| 450 | |
| 451 | future = completedQueue_.front; |
| 452 | completedQueue_.popFront(); |
| 453 | } |
| 454 | |
| 455 | ++completedCount_; |
| 456 | if (completedCount_ == futures_.length) { |
Konrad Grochowski | 3b5dacb | 2014-11-24 10:55:31 +0100 | [diff] [blame] | 457 | // This was the last future in the list, there is no possibility |
Jake Farrell | b95b0ff | 2012-03-22 21:49:10 +0000 | [diff] [blame] | 458 | // another result could ever become available. |
| 459 | finished_ = true; |
| 460 | } |
| 461 | |
| 462 | if (future.status == TFutureStatus.FAILED) { |
| 463 | // This one failed, loop again and try getting another item from |
| 464 | // the queue. |
| 465 | exceptions_ ~= future.getException(); |
| 466 | } else { |
| 467 | resultBuffer_ = future.get(); |
| 468 | bufferFilled_ = true; |
| 469 | return false; |
| 470 | } |
| 471 | } |
| 472 | } |
| 473 | |
| 474 | /** |
| 475 | * Returns the first element from the range. |
| 476 | * |
| 477 | * Potentially blocks until a new result is available or the timeout has |
| 478 | * expired. |
| 479 | * |
| 480 | * Throws: TException if the range is empty. |
| 481 | */ |
| 482 | T front() { |
| 483 | enforce(!empty, new TException( |
| 484 | "Cannot get front of an empty future aggregator range.")); |
| 485 | return resultBuffer_; |
| 486 | } |
| 487 | |
| 488 | /** |
| 489 | * Removes the first element from the range. |
| 490 | * |
| 491 | * Potentially blocks until a new result is available or the timeout has |
| 492 | * expired. |
| 493 | * |
| 494 | * Throws: TException if the range is empty. |
| 495 | */ |
| 496 | void popFront() { |
| 497 | enforce(!empty, new TException( |
| 498 | "Cannot pop front of an empty future aggregator range.")); |
| 499 | bufferFilled_ = false; |
| 500 | } |
| 501 | |
| 502 | /** |
| 503 | * The number of futures the result of which has been returned or which have |
| 504 | * failed so far. |
| 505 | */ |
| 506 | size_t completedCount() @property const { |
| 507 | return completedCount_; |
| 508 | } |
| 509 | |
| 510 | /** |
| 511 | * The exceptions collected from failed TFutures so far. |
| 512 | */ |
| 513 | Exception[] exceptions() @property { |
| 514 | return exceptions_; |
| 515 | } |
| 516 | |
| 517 | private: |
| 518 | TFuture!T[] futures_; |
| 519 | TCancellationOrigin childCancellation_; |
| 520 | |
| 521 | // The system tick this operation will time out, or zero if no timeout has |
| 522 | // been set. |
| 523 | TickDuration timeoutSysTick_; |
| 524 | |
| 525 | bool finished_; |
| 526 | |
| 527 | bool bufferFilled_; |
| 528 | T resultBuffer_; |
| 529 | |
| 530 | Exception[] exceptions_; |
| 531 | size_t completedCount_; |
| 532 | |
| 533 | // The queue of completed futures. This (and the associated condition) are |
| 534 | // the only parts of this class that are accessed by multiple threads. |
| 535 | TFuture!T[] completedQueue_; |
| 536 | Mutex queueMutex_; |
| 537 | Condition queueNonEmptyCondition_; |
| 538 | } |
| 539 | |
| 540 | /** |
| 541 | * TFutureAggregatorRange construction helper to avoid having to explicitly |
| 542 | * specify the value type, i.e. to allow the constructor being called using IFTI |
| 543 | * (see $(DMDBUG 6082, D Bugzilla enhancement requet 6082)). |
| 544 | */ |
| 545 | TFutureAggregatorRange!T tFutureAggregatorRange(T)(TFuture!T[] futures, |
| 546 | TCancellationOrigin childCancellation, Duration timeout = dur!"hnsecs"(0) |
| 547 | ) { |
| 548 | return new TFutureAggregatorRange!T(futures, childCancellation, timeout); |
| 549 | } |