1 /** 2 * Promise concurrency tools. 3 * 4 * License: 5 * This Source Code Form is subject to the terms of 6 * the Mozilla Public License, v. 2.0. If a copy of 7 * the MPL was not distributed with this file, You 8 * can obtain one at http://mozilla.org/MPL/2.0/. 9 * 10 * Authors: 11 * Vladimir Panteleev <ae@cy.md> 12 */ 13 14 module ae.utils.promise.concurrency; 15 16 import std.traits; 17 18 import ae.net.sync; 19 import ae.utils.aa : updateVoid; 20 import ae.utils.promise; 21 22 /// Given a function `fun` which returns a promise, 23 /// globally memoize it (across all threads), 24 /// so that at most one invocation of `fun` with the given parameters 25 /// is invoked during the program's lifetime. 26 /// If a `fun` promise is in progress (incl. if started 27 /// by another thread), wait for it to finish first. 28 template globallyMemoized(alias fun) 29 if (is(ReturnType!fun == Promise!(T, E), T, E)) 30 { 31 alias P = ReturnType!fun; 32 static if (is(FunctionTypeOf!fun PT == __parameters)) 33 { 34 P globallyMemoized(PT args) 35 { 36 // At any point in time (excluding times when it is 37 // being updated, i.e. when the mutex lock is held), 38 // a cache entry can be in one of three states: 39 // - nonexistent (no one tried to calculate this result yet) 40 // - in progress (someone started work on this) 41 // - resolved (the work has finished) 42 static struct Key 43 { 44 Parameters!fun args; 45 } 46 static struct Entry 47 { 48 ThreadAnchor ownerThread; 49 P realPromise; // only safe to access in ownerThread 50 bool resolved; // avoid thread roundtrip, as an optimization 51 } 52 __gshared Entry*[Key] cache; 53 54 auto key = Key(args); 55 56 P localPromise; 57 synchronized cache.updateVoid(key, 58 { 59 localPromise = fun(args); 60 return new Entry(thisThread, localPromise); 61 }, 62 (ref Entry* entry) 63 { 64 if (entry.resolved) 65 { 66 // If we know that the promise is settled, 67 // then it is effectively immutable, 68 // and it is safe to return it as is. 69 localPromise = entry.realPromise; 70 } 71 else 72 { 73 auto localThread = thisThread; 74 if (entry.ownerThread is localThread) 75 { 76 // We are in the thread that owns this promise. 77 // Just return it. 78 localPromise = entry.realPromise; 79 } 80 else 81 { 82 // Go to the thread that created the promise, 83 // and append a continuation which goes back to our thread 84 // and resolves the returned promise. 85 localPromise = new P; 86 entry.ownerThread.runAsync({ 87 entry.realPromise.then((P.ValueTuple value) { 88 entry.resolved = true; 89 localThread.runAsync({ 90 localPromise.fulfill(value); 91 }); 92 }, (error) { 93 entry.resolved = true; 94 localThread.runAsync({ 95 localPromise.reject(error); 96 }); 97 }); 98 }); 99 } 100 } 101 }); 102 return localPromise; 103 } 104 } 105 else 106 static assert(false, "Not a function: " ~ __traits(identifier, fun)); 107 } 108 109 unittest 110 { 111 Promise!void funImpl() { return resolve(); } 112 alias fun = globallyMemoized!funImpl; 113 }