1 /**
2  * Promise concurrency tools.
3  *
4  * License:
5  *   This Source Code Form is subject to the terms of
6  *   the Mozilla Public License, v. 2.0. If a copy of
7  *   the MPL was not distributed with this file, You
8  *   can obtain one at http://mozilla.org/MPL/2.0/.
9  *
10  * Authors:
11  *   Vladimir Panteleev <ae@cy.md>
12  */
13 
14 module ae.utils.promise.concurrency;
15 
16 import core.thread;
17 
18 import std.traits;
19 import std.typecons : No;
20 
21 import ae.net.sync;
22 import ae.utils.aa : updateVoid;
23 import ae.utils.meta;
24 import ae.utils.promise;
25 
26 /// Evaluate `value` in a new thread.
27 /// The promise is resolved in the current (calling) thread.
28 Promise!(T, E) threadAsync(T, E = Exception)(lazy T value)
29 {
30 	auto p = new Promise!T;
31 	auto mainThread = new ThreadAnchor(No.daemon);
32 	Thread t;
33 	t = new Thread({
34 		try
35 		{
36 			auto result = value.voidStruct;
37 			mainThread.runAsync({
38 				t.join();
39 				p.fulfill(result.tupleof);
40 			});
41 		}
42 		catch (Exception e)
43 			mainThread.runAsync({
44 				t.join();
45 				p.reject(e);
46 			});
47 		mainThread.close();
48 	});
49 	t.start();
50 	return p;
51 }
52 
53 unittest
54 {
55 	import ae.net.asockets : socketManager;
56 
57 	int ok;
58 
59 	Thread.sleep(1.msecs).threadAsync.then(() { ok++; });
60 	"foo".threadAsync.dmd21804workaround.then((s) { ok += s == "foo"; });
61 	(){throw new Exception("yolo");}().threadAsync.then((){}, (e) { ok += e.msg == "yolo"; });
62 
63 	socketManager.loop();
64 	assert(ok == 3, [cast(char)('0'+ok)]);
65 }
66 
67 /// Given a function `fun` which returns a promise,
68 /// globally memoize it (across all threads),
69 /// so that at most one invocation of `fun` with the given parameters
70 /// is invoked during the program's lifetime.
71 /// If a `fun` promise is in progress (incl. if started
72 /// by another thread), wait for it to finish first.
73 template globallyMemoized(alias fun)
74 if (is(ReturnType!fun == Promise!(T, E), T, E))
75 {
76 	alias P = ReturnType!fun;
77 	static if (is(FunctionTypeOf!fun PT == __parameters))
78 	{
79 		P globallyMemoized(PT args)
80 		{
81 			// At any point in time (excluding times when it is
82 			// being updated, i.e. when the mutex lock is held),
83 			// a cache entry can be in one of three states:
84 			// - nonexistent (no one tried to calculate this result yet)
85 			// - in progress (someone started work on this)
86 			// - resolved (the work has finished)
87 			static struct Key
88 			{
89 				Parameters!fun args;
90 			}
91 			static struct Entry
92 			{
93 				ThreadAnchor ownerThread;
94 				P realPromise; // only safe to access in ownerThread
95 				bool resolved; // avoid thread roundtrip, as an optimization
96 			}
97 			__gshared Entry*[Key] cache;
98 
99 			auto key = Key(args);
100 
101 			P localPromise;
102 			synchronized cache.updateVoid(key,
103 				{
104 					localPromise = fun(args);
105 					return new Entry(thisThread, localPromise);
106 				},
107 				(ref Entry* entry)
108 				{
109 					if (entry.resolved)
110 					{
111 						// If we know that the promise is settled,
112 						// then it is effectively immutable,
113 						// and it is safe to return it as is.
114 						localPromise = entry.realPromise;
115 					}
116 					else
117 					{
118 						auto localThread = thisThread;
119 						if (entry.ownerThread is localThread)
120 						{
121 							// We are in the thread that owns this promise.
122 							// Just return it.
123 							localPromise = entry.realPromise;
124 						}
125 						else
126 						{
127 							// Go to the thread that created the promise,
128 							// and append a continuation which goes back to our thread
129 							// and resolves the returned promise.
130 							localPromise = new P;
131 							entry.ownerThread.runAsync({
132 								entry.realPromise.then((P.ValueTuple value) {
133 									entry.resolved = true;
134 									localThread.runAsync({
135 										localPromise.fulfill(value);
136 									});
137 								}, (error) {
138 									entry.resolved = true;
139 									localThread.runAsync({
140 										localPromise.reject(error);
141 									});
142 								});
143 							});
144 						}
145 					}
146 				});
147 			return localPromise;
148 		}
149 	}
150 	else
151 		static assert(false, "Not a function: " ~ __traits(identifier, fun));
152 }
153 
154 unittest
155 {
156 	Promise!void funImpl() { return resolve(); }
157 	alias fun = globallyMemoized!funImpl;
158 }