1 /** 2 * async/await-like API for asynchronous tasks combining promises and 3 * fibers. 4 * 5 * License: 6 * This Source Code Form is subject to the terms of 7 * the Mozilla Public License, v. 2.0. If a copy of 8 * the MPL was not distributed with this file, You 9 * can obtain one at http://mozilla.org/MPL/2.0/. 10 * 11 * Authors: 12 * Vladimir Panteleev <ae@cy.md> 13 */ 14 15 module ae.utils.promise.await; 16 17 import core.thread : Fiber; 18 19 import ae.net.asockets : socketManager; 20 import ae.utils.promise; 21 22 enum defaultFiberSize = 64 * 1024; 23 24 /// Evaluates `task` in a new fiber, and returns a promise which is 25 /// fulfilled when `task` exits. `task` may use `await` to block on 26 /// other promises. 27 // TODO: is using lazy OK for this? https://issues.dlang.org/show_bug.cgi?id=23923 28 Promise!(T, E) async(T, E = Exception)(lazy T task, size_t size = defaultFiberSize) 29 if (!is(T == return)) 30 { 31 return async({ return task; }, size); 32 } 33 34 /// ditto 35 Promise!(T, E) async(T, E = Exception)(T delegate() task, size_t size = defaultFiberSize) 36 if (!is(T == return)) 37 { 38 auto p = new Promise!T; 39 auto f = new Fiber({ 40 try 41 static if (is(T == void)) 42 task(), p.fulfill(); 43 else 44 p.fulfill(task()); 45 catch (E e) 46 p.reject(e); 47 }, size); 48 f.call(); 49 return p; 50 } 51 52 /// ditto 53 Promise!(T, E) async(T, E = Exception)(T function() task) 54 if (!is(T == return)) 55 { 56 import std.functional : toDelegate; 57 return async(task.toDelegate); 58 } 59 60 /// Synchronously waits until the promise `p` is fulfilled. 61 /// Can only be called in a fiber. 62 T await(T, E)(Promise!(T, E) p) 63 { 64 Promise!T.ValueTuple fiberValue; 65 E fiberError; 66 67 auto f = Fiber.getThis(); 68 assert(f, "await called while not in a fiber"); 69 p.then((Promise!T.ValueTuple value) { 70 fiberValue = value; 71 f.call(); 72 }, (E error) { 73 fiberError = error; 74 f.call(); 75 }); 76 Fiber.yield(); 77 if (fiberError) 78 throw fiberError; 79 else 80 { 81 static if (!is(T == void)) 82 return fiberValue[0]; 83 } 84 } 85 86 /// 87 debug(ae_unittest) unittest 88 { 89 import ae.net.asockets : socketManager; 90 91 auto one = resolve(1); 92 auto two = resolve(2); 93 94 int sum; 95 async(one.await + two.await).then((value) { 96 sum = value; 97 }); 98 socketManager.loop(); 99 assert(sum == 3); 100 } 101 102 debug(ae_unittest) unittest 103 { 104 if (false) 105 { 106 async({}).await(); 107 async({}()).await(); 108 } 109 } 110 111 /// Synchronously starts and event loop and waits for it to exit. 112 /// Assumes that the promise `p` is resolved during the event loop; 113 /// Propagates any return value or exception to the caller. 114 T awaitSync(T, E)(Promise!(T, E) p) 115 { 116 bool completed; 117 Promise!T.ValueTuple taskValue; 118 E taskError; 119 120 p.then((Promise!T.ValueTuple value) { 121 completed = true; 122 taskValue = value; 123 }, (E error) { 124 completed = true; 125 taskError = error; 126 }); 127 128 socketManager.loop(); 129 130 assert(completed, "Event loop exited but the promise remained unresolved"); 131 if (taskError) 132 throw taskError; 133 else 134 { 135 static if (!is(T == void)) 136 return taskValue[0]; 137 } 138 } 139 140 debug(ae_unittest) unittest 141 { 142 if (false) 143 { 144 async({}).awaitSync(); 145 async({}()).awaitSync(); 146 } 147 }