1 /** 2 * Promise concurrency tools. 3 * 4 * License: 5 * This Source Code Form is subject to the terms of 6 * the Mozilla Public License, v. 2.0. If a copy of 7 * the MPL was not distributed with this file, You 8 * can obtain one at http://mozilla.org/MPL/2.0/. 9 * 10 * Authors: 11 * Vladimir Panteleev <ae@cy.md> 12 */ 13 14 module ae.utils.promise.concurrency; 15 16 import core.thread; 17 18 import std.traits; 19 import std.typecons : No; 20 21 import ae.net.sync; 22 import ae.utils.aa : updateVoid; 23 import ae.utils.meta; 24 import ae.utils.promise; 25 26 /// Evaluate `value` in a new thread. 27 /// The promise is resolved in the current (calling) thread. 28 Promise!(T, E) threadAsync(T, E = Exception)(lazy T value) 29 { 30 auto p = new Promise!T; 31 auto mainThread = new ThreadAnchor(No.daemon); 32 Thread t; 33 t = new Thread({ 34 try 35 { 36 auto result = value.voidStruct; 37 mainThread.runAsync({ 38 t.join(); 39 p.fulfill(result.tupleof); 40 }); 41 } 42 catch (Exception e) 43 mainThread.runAsync({ 44 t.join(); 45 p.reject(e); 46 }); 47 mainThread.close(); 48 }); 49 t.start(); 50 return p; 51 } 52 53 unittest 54 { 55 import ae.net.asockets : socketManager; 56 57 int ok; 58 59 Thread.sleep(1.msecs).threadAsync.then(() { ok++; }); 60 "foo".threadAsync.dmd21804workaround.then((s) { ok += s == "foo"; }); 61 (){throw new Exception("yolo");}().threadAsync.then((){}, (e) { ok += e.msg == "yolo"; }); 62 63 socketManager.loop(); 64 assert(ok == 3, [cast(char)('0'+ok)]); 65 } 66 67 /// Given a function `fun` which returns a promise, 68 /// globally memoize it (across all threads), 69 /// so that at most one invocation of `fun` with the given parameters 70 /// is invoked during the program's lifetime. 71 /// If a `fun` promise is in progress (incl. if started 72 /// by another thread), wait for it to finish first. 73 template globallyMemoized(alias fun) 74 if (is(ReturnType!fun == Promise!(T, E), T, E)) 75 { 76 alias P = ReturnType!fun; 77 static if (is(FunctionTypeOf!fun PT == __parameters)) 78 { 79 P globallyMemoized(PT args) 80 { 81 // At any point in time (excluding times when it is 82 // being updated, i.e. when the mutex lock is held), 83 // a cache entry can be in one of three states: 84 // - nonexistent (no one tried to calculate this result yet) 85 // - in progress (someone started work on this) 86 // - resolved (the work has finished) 87 static struct Key 88 { 89 Parameters!fun args; 90 } 91 static struct Entry 92 { 93 ThreadAnchor ownerThread; 94 P realPromise; // only safe to access in ownerThread 95 bool resolved; // avoid thread roundtrip, as an optimization 96 } 97 __gshared Entry*[Key] cache; 98 99 auto key = Key(args); 100 101 P localPromise; 102 synchronized cache.updateVoid(key, 103 { 104 localPromise = fun(args); 105 return new Entry(thisThread, localPromise); 106 }, 107 (ref Entry* entry) 108 { 109 if (entry.resolved) 110 { 111 // If we know that the promise is settled, 112 // then it is effectively immutable, 113 // and it is safe to return it as is. 114 localPromise = entry.realPromise; 115 } 116 else 117 { 118 auto localThread = thisThread; 119 if (entry.ownerThread is localThread) 120 { 121 // We are in the thread that owns this promise. 122 // Just return it. 123 localPromise = entry.realPromise; 124 } 125 else 126 { 127 // Go to the thread that created the promise, 128 // and append a continuation which goes back to our thread 129 // and resolves the returned promise. 130 localPromise = new P; 131 entry.ownerThread.runAsync({ 132 entry.realPromise.then((P.ValueTuple value) { 133 entry.resolved = true; 134 localThread.runAsync({ 135 localPromise.fulfill(value); 136 }); 137 }, (error) { 138 entry.resolved = true; 139 localThread.runAsync({ 140 localPromise.reject(error); 141 }); 142 }); 143 }); 144 } 145 } 146 }); 147 return localPromise; 148 } 149 } 150 else 151 static assert(false, "Not a function: " ~ __traits(identifier, fun)); 152 } 153 154 unittest 155 { 156 Promise!void funImpl() { return resolve(); } 157 alias fun = globallyMemoized!funImpl; 158 }