1 /** 2 * Promise concurrency tools. 3 * 4 * License: 5 * This Source Code Form is subject to the terms of 6 * the Mozilla Public License, v. 2.0. If a copy of 7 * the MPL was not distributed with this file, You 8 * can obtain one at http://mozilla.org/MPL/2.0/. 9 * 10 * Authors: 11 * Vladimir Panteleev <ae@cy.md> 12 */ 13 14 module ae.utils.promise.concurrency; 15 16 import core.thread; 17 18 import std.traits; 19 import std.typecons : No; 20 21 import ae.net.sync; 22 import ae.utils.aa : updateVoid; 23 import ae.utils.meta; 24 import ae.utils.promise; 25 26 /// Evaluate `value` in a new thread. 27 /// The promise is resolved in the current (calling) thread. 28 // TODO: is using lazy OK for this? https://issues.dlang.org/show_bug.cgi?id=23923 29 Promise!(T, E) threadAsync(T, E = Exception)(lazy T value) 30 if (!is(T == return)) 31 { 32 return threadAsync({ return value; }); 33 } 34 35 /// ditto 36 Promise!(T, E) threadAsync(T, E = Exception)(T delegate() value) 37 if (!is(T == return)) 38 { 39 auto p = new Promise!T; 40 auto mainThread = new ThreadAnchor(No.daemon); 41 Thread t; 42 t = new Thread({ 43 try 44 { 45 auto result = value().voidStruct; 46 mainThread.runAsync({ 47 t.join(); 48 p.fulfill(result.tupleof); 49 }); 50 } 51 catch (Exception e) 52 mainThread.runAsync({ 53 t.join(); 54 p.reject(e); 55 }); 56 mainThread.close(); 57 }); 58 t.start(); 59 return p; 60 } 61 62 /// ditto 63 Promise!(T, E) threadAsync(T, E = Exception)(T function() value) 64 if (!is(T == return)) 65 { 66 import std.functional : toDelegate; 67 return threadAsync(value.toDelegate); 68 } 69 70 unittest 71 { 72 import ae.net.asockets : socketManager; 73 74 int ok; 75 76 Thread.sleep(1.msecs).threadAsync.then(() { ok++; }); 77 "foo".threadAsync.dmd21804workaround.then((s) { ok += s == "foo"; }); 78 (){throw new Exception("yolo");}().threadAsync.then((){}, (e) { ok += e.msg == "yolo"; }); 79 80 socketManager.loop(); 81 assert(ok == 3, [cast(char)('0'+ok)]); 82 } 83 84 // **************************************************************************** 85 86 /// Given a function `fun` which returns a promise, 87 /// globally memoize it (across all threads), 88 /// so that at most one invocation of `fun` with the given parameters 89 /// is invoked during the program's lifetime. 90 /// If a `fun` promise is in progress (incl. if started 91 /// by another thread), wait for it to finish first. 92 template globallyMemoized(alias fun) 93 if (is(ReturnType!fun == Promise!(T, E), T, E)) 94 { 95 alias P = ReturnType!fun; 96 static if (is(FunctionTypeOf!fun PT == __parameters)) 97 { 98 P globallyMemoized(PT args) 99 { 100 // At any point in time (excluding times when it is 101 // being updated, i.e. when the mutex lock is held), 102 // a cache entry can be in one of three states: 103 // - nonexistent (no one tried to calculate this result yet) 104 // - in progress (someone started work on this) 105 // - resolved (the work has finished) 106 static struct Key 107 { 108 Parameters!fun args; 109 } 110 static struct Entry 111 { 112 ThreadAnchor ownerThread; 113 P realPromise; // only safe to access in ownerThread 114 bool resolved; // avoid thread roundtrip, as an optimization 115 } 116 __gshared Entry*[Key] cache; 117 118 auto key = Key(args); 119 120 P localPromise; 121 synchronized cache.updateVoid(key, 122 { 123 localPromise = fun(args); 124 return new Entry(thisThread, localPromise); 125 }, 126 (ref Entry* entry) 127 { 128 if (entry.resolved) 129 { 130 // If we know that the promise is settled, 131 // then it is effectively immutable, 132 // and it is safe to return it as is. 133 localPromise = entry.realPromise; 134 } 135 else 136 { 137 auto localThread = thisThread; 138 if (entry.ownerThread is localThread) 139 { 140 // We are in the thread that owns this promise. 141 // Just return it. 142 localPromise = entry.realPromise; 143 } 144 else 145 { 146 // Go to the thread that created the promise, 147 // and append a continuation which goes back to our thread 148 // and resolves the returned promise. 149 localPromise = new P; 150 entry.ownerThread.runAsync({ 151 entry.realPromise.then((P.ValueTuple value) { 152 entry.resolved = true; 153 localThread.runAsync({ 154 localPromise.fulfill(value); 155 }); 156 }, (error) { 157 entry.resolved = true; 158 localThread.runAsync({ 159 localPromise.reject(error); 160 }); 161 }); 162 }); 163 } 164 } 165 }); 166 return localPromise; 167 } 168 } 169 else 170 static assert(false, "Not a function: " ~ __traits(identifier, fun)); 171 } 172 173 unittest 174 { 175 Promise!void funImpl() { return resolve(); } 176 alias fun = globallyMemoized!funImpl; 177 } 178 179 // **************************************************************************** 180 181 /// Runs tasks asynchronously in an ordered manner. 182 /// For each `put` call, return a `Promise` which 183 /// resolves to the given delegate's return value. 184 /// The `taskFun` is evaluated in a separate thread. 185 /// Unlike `threadAsync`, at most one task will execute 186 /// at any given time (per `AsyncQueue` instance), 187 /// they will be executed in the order of the `put` calls, 188 /// and the promises will be resolved in the main thread 189 /// in the same order. 190 final class AsyncQueue(T, E = Exception) 191 { 192 this() 193 { 194 // Note: std.concurrency can't support daemon tasks 195 anchor = new ThreadAnchor(No.daemon); 196 tid = spawn(&threadFunc, thisTid); 197 } /// 198 199 Promise!(T, E) put(T delegate() taskFun) 200 { 201 auto promise = new Promise!(T, E); 202 tid.send(cast(immutable)Task(taskFun, promise, anchor)); 203 return promise; 204 } /// 205 206 /// Close the queue. Must be called to free up resources 207 /// (thread and message queue). 208 void close() 209 { 210 tid.send(cast(immutable)EOF(anchor)); 211 anchor = null; 212 } 213 214 private: 215 import std.concurrency : spawn, send, receive, Tid, thisTid; 216 217 ThreadAnchor anchor; 218 Tid tid; 219 220 struct Task 221 { 222 T delegate() fun; 223 Promise!(T, E) promise; 224 ThreadAnchor anchor; 225 } 226 struct EOF 227 { 228 ThreadAnchor anchor; 229 } 230 231 static void threadFunc(Tid _) 232 { 233 bool done; 234 while (!done) 235 { 236 receive( 237 (immutable Task immutableTask) 238 { 239 auto task = cast()immutableTask; 240 try 241 { 242 auto result = task.fun().voidStruct; 243 task.anchor.runAsync({ 244 task.promise.fulfill(result.tupleof); 245 }); 246 } 247 catch (E e) 248 task.anchor.runAsync({ 249 task.promise.reject(e); 250 }); 251 }, 252 (immutable EOF immutableEOF) 253 { 254 auto eof = cast()immutableEOF; 255 eof.anchor.close(); 256 done = true; 257 }, 258 ); 259 } 260 } 261 } 262 263 unittest 264 { 265 import ae.net.asockets : socketManager; 266 267 int[] result; 268 { 269 auto queue = new AsyncQueue!void; 270 scope(exit) queue.close(); 271 auto taskFun(int n) { return () { Thread.sleep(n.msecs); result ~= n; }; } 272 queue.put(taskFun(200)); 273 queue.put(taskFun(100)); 274 } 275 socketManager.loop(); 276 assert(result == [200, 100]); 277 }