1 /** 2 * An implementation of promises. 3 * 4 * License: 5 * This Source Code Form is subject to the terms of 6 * the Mozilla Public License, v. 2.0. If a copy of 7 * the MPL was not distributed with this file, You 8 * can obtain one at http://mozilla.org/MPL/2.0/. 9 * 10 * Authors: 11 * Vladimir Panteleev <ae@cy.md> 12 */ 13 14 module ae.utils.promise; 15 16 import std.functional; 17 import std.meta : allSatisfy, AliasSeq; 18 import std.traits : CommonType; 19 20 import ae.net.asockets : socketManager, onNextTick; 21 22 debug (no_ae_promise) {} else debug debug = ae_promise; 23 24 /** 25 A promise for a value `T` or error `E`. 26 27 Attempts to implement the Promises/A+ spec 28 (https://promisesaplus.com/), 29 with the following deviations: 30 31 - Sections 2.2.1.1-2, 2.2.7.3-4: Due to D strong typing, the only 32 applicable interpretation of "not a function" is `null`. 33 34 - Section 2.2.5: JavaScript-specific, and does not apply to D. 35 36 - Section 2.2.7.2: In D, thrown objects may only be descendants of 37 `Throwable`. By default, `Exception` objects are caught, and 38 passed to `onRejected` handlers. 39 40 - Section 2.2.7.1/3: In the case when `onFulfilled` is `null` but 41 `onRejected` is not, the returned promise may be resolved with 42 either the fulfilled value of the current promise or the return 43 value of `onRejected`. In this case, the type of the returned 44 promise value is the D common type of the two, or `void` if none. 45 46 - Section 2.3.1: Instead of rejecting the promise with a TypeError, 47 an assertion failure is thrown. 48 49 - Section 2.3.3: Not implemented. This section facilitates 50 interoperability with other implementations of JavaScript 51 promises, though it could be implemented in D using DbI to 52 support arbitrary then-able objects. 53 54 Additionally, this implementation differs from typical JavaScript 55 implementations as follows: 56 57 - `T` may be `void`. In this case, `fulfill`, and the delegate in 58 first argument of `then`, take zero arguments instead of one. 59 60 - Instead of the constructor accepting a function which accepts the 61 `fulfill` / `reject` functions, these functions are available as 62 regular methods. 63 64 - Attempts to fulfill or reject a non-pending promise cause an 65 assertion failure instead of being silently ignored. 66 (The Promises/A+ standard touches on this in section 2.3.3.3.3.) 67 68 - `catch` is called `except` (because the former is a reserved D 69 keyword). 70 71 - `finally` is called `finish` (because the former is a reserved D 72 keyword). 73 74 - In debug builds, resolved `Promise` instances check on 75 destruction that their value / error was passed on to a handler 76 (unless they have been successfully fulfilled to a `void` value). 77 Such leaks are reported to the standard error stream. 78 */ 79 final class Promise(T, E : Throwable = Exception) 80 { 81 private: 82 /// Box of `T`, if it's not `void`, otherwise empty `struct`. 83 struct Box 84 { 85 static if (!is(T == void)) 86 T value; 87 } 88 89 alias A = typeof(Box.tupleof); 90 91 PromiseState state; 92 93 union 94 { 95 Box value; 96 E error; 97 } 98 99 PromiseHandler[] handlers; 100 101 enum isNoThrow = is(typeof(delegate void(void delegate() fun) nothrow { try fun(); catch (E) {} })); 102 103 private struct PromiseHandler 104 { 105 static if (isNoThrow) 106 void delegate() nothrow dg; 107 else 108 void delegate() dg; 109 bool onFulfill, onReject; 110 } 111 112 void doFulfill(A value) /*nothrow*/ 113 { 114 this.state = PromiseState.fulfilled; 115 this.value.tupleof = value; 116 static if (!is(T == void)) 117 debug (ae_promise) markAsUnused(); 118 foreach (ref handler; handlers) 119 if (handler.onFulfill) 120 handler.dg(); 121 handlers = null; 122 } 123 124 void doReject(E e) /*nothrow*/ 125 { 126 this.state = PromiseState.rejected; 127 this.error = e; 128 debug (ae_promise) markAsUnused(); 129 foreach (ref handler; handlers) 130 if (handler.onReject) 131 handler.dg(); 132 handlers = null; 133 } 134 135 /// Implements the [[Resolve]](promise, x) resolution procedure. 136 void resolve(scope lazy T valueExpr) /*nothrow*/ 137 { 138 Box box; 139 static if (is(T == void)) 140 valueExpr; 141 else 142 box.value = valueExpr; 143 144 fulfill(box.tupleof); 145 } 146 147 /// ditto 148 void resolve(Promise!(T, E) x) /*nothrow*/ 149 { 150 assert(x !is this, "Attempting to resolve a promise with itself"); 151 assert(this.state == PromiseState.pending); 152 this.state = PromiseState.following; 153 x.then(&resolveFulfill, &resolveReject); 154 } 155 156 void resolveFulfill(A value) /*nothrow*/ 157 { 158 assert(this.state == PromiseState.following); 159 doFulfill(value); 160 } 161 162 void resolveReject(E e) /*nothrow*/ 163 { 164 assert(this.state == PromiseState.following); 165 doReject(e); 166 } 167 168 // This debug machinery tracks leaked promises, i.e. promises 169 // which have been fulfilled/rejected, but their result was never 170 // used (their .then method was never called). 171 debug (ae_promise) 172 { 173 // Global doubly linked list of promises with unused results 174 static typeof(this) unusedHead, unusedTail; 175 typeof(this) unusedPrev, unusedNext; 176 bool isUnused() { return unusedPrev || (unusedHead is this); } 177 178 LeakedPromiseError leakedPromiseError; 179 bool resultUsed; 180 181 void markAsUnused() 182 { 183 if (resultUsed) 184 return; // An earlier `then` call has priority 185 assert(!isUnused); 186 if (unusedTail) 187 { 188 unusedPrev = unusedTail; 189 unusedTail.unusedNext = this; 190 } 191 unusedTail = this; 192 if (!unusedHead) 193 unusedHead = this; 194 } 195 196 void markAsUsed() 197 { 198 if (resultUsed) 199 return; 200 resultUsed = true; 201 if (isUnused) 202 { 203 if (unusedPrev) unusedPrev.unusedNext = unusedNext; else unusedHead = unusedNext; 204 if (unusedNext) unusedNext.unusedPrev = unusedPrev; else unusedTail = unusedPrev; 205 } 206 } 207 208 static ~this() 209 { 210 for (auto p = unusedHead; p; p = p.unusedNext) 211 { 212 // If these asserts fail, there is a bug in our debug machinery 213 assert(p.state != PromiseState.pending && p.state != PromiseState.following && !p.resultUsed); 214 static if (is(T == void)) 215 assert(p.state != PromiseState.fulfilled); 216 217 import core.stdc.stdio : fprintf, stderr; 218 fprintf(stderr, "Leaked %s %s\n", 219 p.state == PromiseState.fulfilled ? "fulfilled".ptr : "rejected".ptr, 220 typeof(this).stringof.ptr); 221 if (p.state == PromiseState.rejected) 222 _d_print_throwable(p.error); 223 _d_print_throwable(p.leakedPromiseError); 224 } 225 } 226 } 227 228 public: 229 debug (ae_promise) 230 this() nothrow 231 { 232 // Record instantiation point 233 try 234 throw new LeakedPromiseError(); 235 catch (LeakedPromiseError e) 236 leakedPromiseError = e; 237 catch (Throwable) {} // allow nothrow 238 } 239 240 /// A tuple of this `Promise`'s value. 241 /// Either `(T)` or an empty tuple. 242 alias ValueTuple = A; 243 244 /// Work-around for DMD bug 21804: 245 /// https://issues.dlang.org/show_bug.cgi?id=21804 246 /// If your `then` callback argument is a tuple, 247 /// insert this call before the `then` call. 248 /// (Needs to be done only once per `Promise!T` instance.) 249 typeof(this) dmd21804workaround() 250 { 251 static if (!is(T == void)) 252 if (false) 253 then((A result) {}); 254 return this; 255 } 256 257 /// Ignore this promise leaking in debug builds. 258 void ignoreResult() 259 { 260 debug (ae_promise) markAsUsed(); 261 } 262 263 /// Fulfill this promise, with the given value (if applicable). 264 void fulfill(A value) /*nothrow*/ 265 { 266 assert(this.state == PromiseState.pending, 267 "This promise is already fulfilled, rejected, or following another promise."); 268 doFulfill(value); 269 } 270 271 /// Reject this promise, with the given exception. 272 void reject(E e) /*nothrow*/ 273 { 274 assert(this.state == PromiseState.pending, 275 "This promise is already fulfilled, rejected, or following another promise."); 276 doReject(e); 277 } 278 279 /// Registers the specified fulfillment and rejection handlers. 280 /// If the promise is already resolved, they are called 281 /// as soon as possible (but not immediately). 282 Promise!(Unpromise!R, F) then(R, F = E)(R delegate(A) onFulfilled, R delegate(E) onRejected = null) /*nothrow*/ 283 { 284 static if (!is(T : R)) 285 assert(onFulfilled, "Cannot implicitly propagate " ~ T.stringof ~ " to " ~ R.stringof ~ " due to null onFulfilled"); 286 287 auto next = new typeof(return); 288 289 void fulfillHandler() /*nothrow*/ 290 { 291 assert(this.state == PromiseState.fulfilled); 292 if (onFulfilled) 293 { 294 try 295 next.resolve(onFulfilled(this.value.tupleof)); 296 catch (F e) 297 next.reject(e); 298 } 299 else 300 { 301 static if (is(R == void)) 302 next.fulfill(); 303 else 304 { 305 static if (!is(T : R)) 306 assert(false); // verified above 307 else 308 next.fulfill(this.value.tupleof); 309 } 310 } 311 } 312 313 void rejectHandler() /*nothrow*/ 314 { 315 assert(this.state == PromiseState.rejected); 316 if (onRejected) 317 { 318 try 319 next.resolve(onRejected(this.error)); 320 catch (F e) 321 next.reject(e); 322 } 323 else 324 next.reject(this.error); 325 } 326 327 final switch (this.state) 328 { 329 case PromiseState.pending: 330 case PromiseState.following: 331 handlers ~= PromiseHandler({ callSoon(&fulfillHandler); }, true, false); 332 handlers ~= PromiseHandler({ callSoon(&rejectHandler); }, false, true); 333 break; 334 case PromiseState.fulfilled: 335 callSoon(&fulfillHandler); 336 break; 337 case PromiseState.rejected: 338 callSoon(&rejectHandler); 339 break; 340 } 341 342 debug (ae_promise) markAsUsed(); 343 return next; 344 } 345 346 /// Special overload of `then` with no `onFulfilled` function. 347 /// In this scenario, `onRejected` can act as a filter, 348 /// converting errors into values for the next promise in the chain. 349 Promise!(CommonType!(Unpromise!R, T), F) then(R, F = E)(typeof(null) onFulfilled, R delegate(E) onRejected) /*nothrow*/ 350 { 351 // The returned promise will be fulfilled with either 352 // `this.value` (if `this` is fulfilled), or the return value 353 // of `onRejected` (if `this` is rejected). 354 alias C = CommonType!(Unpromise!R, T); 355 356 auto next = new typeof(return); 357 358 void fulfillHandler() /*nothrow*/ 359 { 360 assert(this.state == PromiseState.fulfilled); 361 static if (is(C == void)) 362 next.fulfill(); 363 else 364 next.fulfill(this.value.tupleof); 365 } 366 367 void rejectHandler() /*nothrow*/ 368 { 369 assert(this.state == PromiseState.rejected); 370 if (onRejected) 371 { 372 try 373 next.resolve(onRejected(this.error)); 374 catch (F e) 375 next.reject(e); 376 } 377 else 378 next.reject(this.error); 379 } 380 381 final switch (this.state) 382 { 383 case PromiseState.pending: 384 case PromiseState.following: 385 handlers ~= PromiseHandler({ callSoon(&fulfillHandler); }, true, false); 386 handlers ~= PromiseHandler({ callSoon(&rejectHandler); }, false, true); 387 break; 388 case PromiseState.fulfilled: 389 callSoon(&fulfillHandler); 390 break; 391 case PromiseState.rejected: 392 callSoon(&rejectHandler); 393 break; 394 } 395 396 debug (ae_promise) markAsUsed(); 397 return next; 398 } 399 400 /// Registers a rejection handler. 401 /// Equivalent to `then(null, onRejected)`. 402 /// Similar to the `catch` method in JavaScript promises. 403 Promise!(R, F) except(R, F = E)(R delegate(E) onRejected) 404 { 405 return this.then(null, onRejected); 406 } 407 408 /// Registers a finalization handler, which is called when the 409 /// promise is resolved (either fulfilled or rejected). 410 /// Roughly equivalent to `then(value => onResolved(), error => onResolved())`. 411 /// Similar to the `finally` method in JavaScript promises. 412 Promise!(R, F) finish(R, F = E)(R delegate() onResolved) 413 { 414 assert(onResolved, "No onResolved delegate specified in .finish"); 415 416 auto next = new typeof(return); 417 418 void handler() /*nothrow*/ 419 { 420 assert(this.state == PromiseState.fulfilled || this.state == PromiseState.rejected); 421 try 422 next.resolve(onResolved()); 423 catch (F e) 424 next.reject(e); 425 } 426 427 final switch (this.state) 428 { 429 case PromiseState.pending: 430 case PromiseState.following: 431 handlers ~= PromiseHandler({ callSoon(&handler); }, true, true); 432 break; 433 case PromiseState.fulfilled: 434 case PromiseState.rejected: 435 callSoon(&handler); 436 break; 437 } 438 439 debug (ae_promise) markAsUsed(); 440 return next; 441 } 442 } 443 444 // (These declarations are top-level because they don't need to be templated.) 445 446 private enum PromiseState 447 { 448 pending, 449 following, 450 fulfilled, 451 rejected, 452 } 453 454 debug (ae_promise) 455 { 456 private final class LeakedPromiseError : Throwable { this() { super("Created here:"); } } 457 private extern (C) void _d_print_throwable(Throwable t) @nogc; 458 } 459 460 // The reverse operation is the `.resolve` overload. 461 private template Unpromise(P) 462 { 463 static if (is(P == Promise!(T, E), T, E)) 464 alias Unpromise = T; 465 else 466 alias Unpromise = P; 467 } 468 469 // This is the only non-"pure" part of this implementation. 470 private void callSoon(void delegate() dg) @safe nothrow { socketManager.onNextTick(dg); } 471 472 // This is just a simple instantiation test. 473 // The full test suite (D translation of the Promises/A+ conformance 474 // test) is here: https://github.com/CyberShadow/ae-promises-tests 475 nothrow unittest 476 { 477 static bool never; if (never) 478 { 479 Promise!int test; 480 test.then((int i) {}); 481 test.then((int i) {}, (Exception e) {}); 482 test.then(null, (Exception e) {}); 483 test.except((Exception e) {}); 484 test.finish({}); 485 test.fulfill(1); 486 test.reject(Exception.init); 487 488 Promise!void test2; 489 test2.then({}); 490 } 491 } 492 493 // Non-Exception based errors 494 unittest 495 { 496 static bool never; if (never) 497 { 498 static class OtherException : Exception 499 { 500 this() { super(null); } 501 } 502 503 Promise!(int, OtherException) test; 504 test.then((int i) {}); 505 test.then((int i) {}, (OtherException e) {}); 506 test.then(null, (OtherException e) {}); 507 test.except((OtherException e) {}); 508 test.fulfill(1); 509 test.reject(OtherException.init); 510 } 511 } 512 513 // Following 514 unittest 515 { 516 auto p = new Promise!void; 517 bool ok; 518 p.then({ 519 return resolve(true); 520 }).then((value) { 521 ok = value; 522 }); 523 p.fulfill(); 524 socketManager.loop(); 525 assert(ok); 526 } 527 528 // **************************************************************************** 529 530 /// Returns a new `Promise!void` which is resolved. 531 Promise!void resolve(E = Exception)() { auto p = new Promise!(void, E)(); p.fulfill(); return p; } 532 533 /// Returns a new `Promise` which is resolved with the given value. 534 Promise!T resolve(T, E = Exception)(T value) { auto p = new Promise!(T, E)(); p.fulfill(value); return p; } 535 536 /// Returns a new `Promise` which is rejected with the given reason. 537 Promise!(T, E) reject(T, E)(E reason) { auto p = new Promise!(T, E)(); p.reject(reason); return p; } 538 539 // **************************************************************************** 540 541 /// Return `true` if `P` is a `Promise` instantiation. 542 template isPromise(P) 543 { 544 static if (is(P == Promise!(T, E), T, E)) 545 enum isPromise = true; 546 else 547 enum isPromise = false; 548 } 549 550 /// Get the value type of the promise `P`, 551 /// i.e. its `T` parameter. 552 template PromiseValue(P) 553 { 554 /// 555 static if (is(P == Promise!(T, E), T, E)) 556 alias PromiseValue = T; 557 else 558 static assert(false); 559 } 560 561 /// Get the error type of the promise `P`, 562 /// i.e. its `E` parameter. 563 template PromiseError(P) 564 { 565 /// 566 static if (is(P == Promise!(T, E), T, E)) 567 alias PromiseError = E; 568 else 569 static assert(false); 570 } 571 572 /// Construct a new Promise type based on `P`, 573 /// if the given transformation was applied on the value type. 574 /// If `P` is a `void` Promise, then the returned promise 575 /// will also be `void`. 576 template PromiseValueTransform(P, alias transform) 577 if (is(P == Promise!(T, E), T, E)) 578 { 579 /// ditto 580 static if (is(P == Promise!(T, E), T, E)) 581 { 582 static if (is(T == void)) 583 private alias T2 = void; 584 else 585 private alias T2 = typeof({ T* value; return transform(*value); }()); 586 alias PromiseValueTransform = Promise!(T2, E); 587 } 588 } 589 590 // **************************************************************************** 591 592 /// Wait for all promises to be resolved, or for any to be rejected. 593 PromiseValueTransform!(P, x => [x]) all(P)(P[] promises) 594 if (is(P == Promise!(T, E), T, E)) 595 { 596 alias T = PromiseValue!P; 597 598 auto allPromise = new typeof(return); 599 600 typeof(return).ValueTuple results; 601 static if (!is(T == void)) 602 results[0] = new T[promises.length]; 603 604 if (promises.length) 605 { 606 size_t numResolved; 607 foreach (i, p; promises) 608 (i, p) { 609 p.dmd21804workaround.then((P.ValueTuple result) { 610 if (allPromise) 611 { 612 static if (!is(T == void)) 613 results[0][i] = result[0]; 614 if (++numResolved == promises.length) 615 allPromise.fulfill(results); 616 } 617 }, (error) { 618 if (allPromise) 619 { 620 allPromise.reject(error); 621 allPromise = null; // ignore successive resolves / rejects 622 } 623 }); 624 }(i, p); 625 } 626 else 627 allPromise.fulfill(results); 628 return allPromise; 629 } 630 631 nothrow unittest 632 { 633 import std.exception : assertNotThrown; 634 int result; 635 auto p1 = new Promise!int; 636 auto p2 = new Promise!int; 637 auto p3 = new Promise!int; 638 p2.fulfill(2); 639 auto pAll = all([p1, p2, p3]); 640 p1.fulfill(1); 641 pAll.dmd21804workaround.then((values) { result = values[0] + values[1] + values[2]; }); 642 p3.fulfill(3); 643 socketManager.loop().assertNotThrown; 644 assert(result == 6); 645 } 646 647 nothrow unittest 648 { 649 import std.exception : assertNotThrown; 650 int called; 651 auto p1 = new Promise!void; 652 auto p2 = new Promise!void; 653 auto p3 = new Promise!void; 654 p2.fulfill(); 655 auto pAll = all([p1, p2, p3]); 656 p1.fulfill(); 657 pAll.then({ called = true; }); 658 socketManager.loop().assertNotThrown; 659 assert(!called); 660 p3.fulfill(); 661 socketManager.loop().assertNotThrown; 662 assert(called); 663 } 664 665 nothrow unittest 666 { 667 import std.exception : assertNotThrown; 668 Promise!void[] promises; 669 auto pAll = all(promises); 670 bool called; 671 pAll.then({ called = true; }); 672 socketManager.loop().assertNotThrown; 673 assert(called); 674 } 675 676 private template AllResultImpl(size_t promiseIndex, size_t resultIndex, Promises...) 677 { 678 static if (Promises.length == 0) 679 { 680 alias TupleMembers = AliasSeq!(); 681 enum size_t[] mapping = []; 682 } 683 else 684 static if (is(PromiseValue!(Promises[0]) == void)) 685 { 686 alias Next = AllResultImpl!(promiseIndex + 1, resultIndex, Promises[1..$]); 687 alias TupleMembers = Next.TupleMembers; 688 enum size_t[] mapping = [size_t(-1)] ~ Next.mapping; 689 } 690 else 691 { 692 alias Next = AllResultImpl!(promiseIndex + 1, resultIndex + 1, Promises[1..$]); 693 alias TupleMembers = AliasSeq!(PromiseValue!(Promises[0]), Next.TupleMembers); 694 enum size_t[] mapping = [resultIndex] ~ Next.mapping; 695 } 696 } 697 698 // Calculates a value type for a Promise suitable to hold the values of the given promises. 699 // void-valued promises are removed; an empty list is converted to void. 700 // Also calculates an index map from Promises indices to tuple member indices. 701 private template AllResult(Promises...) 702 { 703 alias Impl = AllResultImpl!(0, 0, Promises); 704 static if (Impl.TupleMembers.length == 0) 705 alias ResultType = void; 706 else 707 { 708 import std.typecons : Tuple; 709 alias ResultType = Tuple!(Impl.TupleMembers); 710 } 711 } 712 713 private alias PromiseBox(P) = P.Box; 714 715 /// Heterogeneous variant, which resolves to a tuple. 716 /// void promises' values are omitted from the result tuple. 717 /// If all promises are void, then so is the result. 718 Promise!(AllResult!Promises.ResultType) all(Promises...)(Promises promises) 719 if (allSatisfy!(isPromise, Promises)) 720 { 721 AllResult!Promises.Impl.TupleMembers results; 722 723 auto allPromise = new typeof(return); 724 725 static if (promises.length) 726 { 727 size_t numResolved; 728 foreach (i, p; promises) 729 { 730 alias P = typeof(p); 731 alias T = PromiseValue!P; 732 p.dmd21804workaround.then((P.ValueTuple result) { 733 if (allPromise) 734 { 735 static if (!is(T == void)) 736 results[AllResult!Promises.Impl.mapping[i]] = result[0]; 737 if (++numResolved == promises.length) 738 { 739 static if (AllResult!Promises.Impl.TupleMembers.length) 740 { 741 import std.typecons : tuple; 742 allPromise.fulfill(tuple(results)); 743 } 744 else 745 allPromise.fulfill(); 746 } 747 } 748 }, (error) { 749 if (allPromise) 750 { 751 allPromise.reject(error); 752 allPromise = null; // ignore successive resolves / rejects 753 } 754 }); 755 } 756 } 757 else 758 allPromise.fulfill(); 759 return allPromise; 760 } 761 762 nothrow unittest 763 { 764 import std.exception : assertNotThrown; 765 import ae.utils.meta : I; 766 767 int result; 768 auto p1 = new Promise!byte; 769 auto p2 = new Promise!void; 770 auto p3 = new Promise!int; 771 p2.fulfill(); 772 auto pAll = all(p1, p2, p3); 773 p1.fulfill(1); 774 pAll.dmd21804workaround 775 .then(values => values.expand.I!((v1, v3) { 776 result = v1 + v3; 777 })); 778 p3.fulfill(3); 779 socketManager.loop().assertNotThrown; 780 assert(result == 4); 781 } 782 783 nothrow unittest 784 { 785 bool ok; 786 import std.exception : assertNotThrown; 787 auto p1 = new Promise!void; 788 auto p2 = new Promise!void; 789 auto p3 = new Promise!void; 790 p2.fulfill(); 791 auto pAll = all(p1, p2, p3); 792 p1.fulfill(); 793 pAll.then({ ok = true; }); 794 socketManager.loop().assertNotThrown; 795 assert(!ok); 796 p3.fulfill(); 797 socketManager.loop().assertNotThrown; 798 assert(ok); 799 } 800 801 // **************************************************************************** 802 803 Promise!(T, E) require(T, E)(ref Promise!(T, E) p, lazy Promise!(T, E) lp) 804 { 805 if (!p) 806 p = lp; 807 return p; 808 } 809 810 unittest 811 { 812 Promise!int p; 813 int work; 814 Promise!int getPromise() 815 { 816 return p.require({ 817 work++; 818 return resolve(1); 819 }()); 820 } 821 int done; 822 getPromise().then((n) { done += 1; }); 823 getPromise().then((n) { done += 1; }); 824 socketManager.loop(); 825 assert(work == 1 && done == 2); 826 } 827 828 /// Ordered promise queue, supporting asynchronous enqueuing / fulfillment. 829 struct PromiseQueue(T, E = Exception) 830 { 831 private alias P = Promise!(T, E); 832 833 private P[] fulfilled, waiting; 834 835 import ae.utils.array : queuePush, queuePop; 836 837 /// Retrieve the next fulfilled promise, or enqueue a waiting one. 838 P waitOne() 839 { 840 if (fulfilled.length) 841 return fulfilled.queuePop(); 842 843 auto p = new P; 844 waiting.queuePush(p); 845 return p; 846 } 847 848 /// Fulfill one waiting promise, or enqueue a fulfilled one. 849 P fulfillOne(typeof(P.Box.tupleof) value) 850 { 851 if (waiting.length) 852 { 853 waiting.queuePop.fulfill(value); 854 return null; 855 } 856 857 auto p = new P; 858 p.fulfill(value); 859 fulfilled.queuePush(p); 860 return p; 861 } 862 } 863 864 unittest 865 { 866 PromiseQueue!int q; 867 q.fulfillOne(1); 868 q.fulfillOne(2); 869 int[] result; 870 q.waitOne().then((i) { result ~= i; }); 871 q.waitOne().then((i) { result ~= i; }); 872 socketManager.loop(); 873 assert(result == [1, 2]); 874 } 875 876 unittest 877 { 878 PromiseQueue!int q; 879 int[] result; 880 q.waitOne().then((i) { result ~= i; }); 881 q.waitOne().then((i) { result ~= i; }); 882 q.fulfillOne(1); 883 q.fulfillOne(2); 884 socketManager.loop(); 885 assert(result == [1, 2]); 886 }