1 /**
2  * Promise concurrency tools.
3  *
4  * License:
5  *   This Source Code Form is subject to the terms of
6  *   the Mozilla Public License, v. 2.0. If a copy of
7  *   the MPL was not distributed with this file, You
8  *   can obtain one at http://mozilla.org/MPL/2.0/.
9  *
10  * Authors:
11  *   Vladimir Panteleev <ae@cy.md>
12  */
13 
14 module ae.utils.promise.concurrency;
15 
16 import std.traits;
17 
18 import ae.net.sync;
19 import ae.utils.aa : updateVoid;
20 import ae.utils.promise;
21 
22 /// Given a function `fun` which returns a promise,
23 /// globally memoize it (across all threads),
24 /// so that at most one invocation of `fun` with the given parameters
25 /// is invoked during the program's lifetime.
26 /// If a `fun` promise is in progress (incl. if started
27 /// by another thread), wait for it to finish first.
28 template globallyMemoized(alias fun)
29 if (is(ReturnType!fun == Promise!(T, E), T, E))
30 {
31 	alias P = ReturnType!fun;
32 	static if (is(FunctionTypeOf!fun PT == __parameters))
33 	{
34 		P globallyMemoized(PT args)
35 		{
36 			// At any point in time (excluding times when it is
37 			// being updated, i.e. when the mutex lock is held),
38 			// a cache entry can be in one of three states:
39 			// - nonexistent (no one tried to calculate this result yet)
40 			// - in progress (someone started work on this)
41 			// - resolved (the work has finished)
42 			static struct Key
43 			{
44 				Parameters!fun args;
45 			}
46 			static struct Entry
47 			{
48 				ThreadAnchor ownerThread;
49 				P realPromise; // only safe to access in ownerThread
50 				bool resolved; // avoid thread roundtrip, as an optimization
51 			}
52 			__gshared Entry*[Key] cache;
53 
54 			auto key = Key(args);
55 
56 			P localPromise;
57 			synchronized cache.updateVoid(key,
58 				{
59 					localPromise = fun(args);
60 					return new Entry(thisThread, localPromise);
61 				},
62 				(ref Entry* entry)
63 				{
64 					if (entry.resolved)
65 					{
66 						// If we know that the promise is settled,
67 						// then it is effectively immutable,
68 						// and it is safe to return it as is.
69 						localPromise = entry.realPromise;
70 					}
71 					else
72 					{
73 						auto localThread = thisThread;
74 						if (entry.ownerThread is localThread)
75 						{
76 							// We are in the thread that owns this promise.
77 							// Just return it.
78 							localPromise = entry.realPromise;
79 						}
80 						else
81 						{
82 							// Go to the thread that created the promise,
83 							// and append a continuation which goes back to our thread
84 							// and resolves the returned promise.
85 							localPromise = new P;
86 							entry.ownerThread.runAsync({
87 								entry.realPromise.then((P.ValueTuple value) {
88 									entry.resolved = true;
89 									localThread.runAsync({
90 										localPromise.fulfill(value);
91 									});
92 								}, (error) {
93 									entry.resolved = true;
94 									localThread.runAsync({
95 										localPromise.reject(error);
96 									});
97 								});
98 							});
99 						}
100 					}
101 				});
102 			return localPromise;
103 		}
104 	}
105 	else
106 		static assert(false, "Not a function: " ~ __traits(identifier, fun));
107 }
108 
109 unittest
110 {
111 	Promise!void funImpl() { return resolve(); }
112 	alias fun = globallyMemoized!funImpl;
113 }