1 /**
2  * Promise concurrency tools.
3  *
4  * License:
5  *   This Source Code Form is subject to the terms of
6  *   the Mozilla Public License, v. 2.0. If a copy of
7  *   the MPL was not distributed with this file, You
8  *   can obtain one at http://mozilla.org/MPL/2.0/.
9  *
10  * Authors:
11  *   Vladimir Panteleev <ae@cy.md>
12  */
13 
14 module ae.utils.promise.concurrency;
15 
16 import core.thread;
17 
18 import std.traits;
19 import std.typecons : No;
20 
21 import ae.net.sync;
22 import ae.utils.aa : updateVoid;
23 import ae.utils.meta;
24 import ae.utils.promise;
25 
26 /// Evaluate `value` in a new thread.
27 /// The promise is resolved in the current (calling) thread.
28 // TODO: is using lazy OK for this? https://issues.dlang.org/show_bug.cgi?id=23923
29 Promise!(T, E) threadAsync(T, E = Exception)(lazy T value)
30 if (!is(T == return))
31 {
32 	return threadAsync({ return value; });
33 }
34 
35 /// ditto
36 Promise!(T, E) threadAsync(T, E = Exception)(T delegate() value)
37 if (!is(T == return))
38 {
39 	auto p = new Promise!T;
40 	auto mainThread = new ThreadAnchor(No.daemon);
41 	Thread t;
42 	t = new Thread({
43 		try
44 		{
45 			auto result = value().voidStruct;
46 			mainThread.runAsync({
47 				t.join();
48 				p.fulfill(result.tupleof);
49 			});
50 		}
51 		catch (Exception e)
52 			mainThread.runAsync({
53 				t.join();
54 				p.reject(e);
55 			});
56 		mainThread.close();
57 	});
58 	t.start();
59 	return p;
60 }
61 
62 /// ditto
63 Promise!(T, E) threadAsync(T, E = Exception)(T function() value)
64 if (!is(T == return))
65 {
66 	import std.functional : toDelegate;
67 	return threadAsync(value.toDelegate);
68 }
69 
70 unittest
71 {
72 	import ae.net.asockets : socketManager;
73 
74 	int ok;
75 
76 	Thread.sleep(1.msecs).threadAsync.then(() { ok++; });
77 	"foo".threadAsync.dmd21804workaround.then((s) { ok += s == "foo"; });
78 	(){throw new Exception("yolo");}().threadAsync.then((){}, (e) { ok += e.msg == "yolo"; });
79 
80 	socketManager.loop();
81 	assert(ok == 3, [cast(char)('0'+ok)]);
82 }
83 
84 // ****************************************************************************
85 
86 /// Given a function `fun` which returns a promise,
87 /// globally memoize it (across all threads),
88 /// so that at most one invocation of `fun` with the given parameters
89 /// is invoked during the program's lifetime.
90 /// If a `fun` promise is in progress (incl. if started
91 /// by another thread), wait for it to finish first.
92 template globallyMemoized(alias fun)
93 if (is(ReturnType!fun == Promise!(T, E), T, E))
94 {
95 	alias P = ReturnType!fun;
96 	static if (is(FunctionTypeOf!fun PT == __parameters))
97 	{
98 		P globallyMemoized(PT args)
99 		{
100 			// At any point in time (excluding times when it is
101 			// being updated, i.e. when the mutex lock is held),
102 			// a cache entry can be in one of three states:
103 			// - nonexistent (no one tried to calculate this result yet)
104 			// - in progress (someone started work on this)
105 			// - resolved (the work has finished)
106 			static struct Key
107 			{
108 				Parameters!fun args;
109 			}
110 			static struct Entry
111 			{
112 				ThreadAnchor ownerThread;
113 				P realPromise; // only safe to access in ownerThread
114 				bool resolved; // avoid thread roundtrip, as an optimization
115 			}
116 			__gshared Entry*[Key] cache;
117 
118 			auto key = Key(args);
119 
120 			P localPromise;
121 			synchronized cache.updateVoid(key,
122 				{
123 					localPromise = fun(args);
124 					return new Entry(thisThread, localPromise);
125 				},
126 				(ref Entry* entry)
127 				{
128 					if (entry.resolved)
129 					{
130 						// If we know that the promise is settled,
131 						// then it is effectively immutable,
132 						// and it is safe to return it as is.
133 						localPromise = entry.realPromise;
134 					}
135 					else
136 					{
137 						auto localThread = thisThread;
138 						if (entry.ownerThread is localThread)
139 						{
140 							// We are in the thread that owns this promise.
141 							// Just return it.
142 							localPromise = entry.realPromise;
143 						}
144 						else
145 						{
146 							// Go to the thread that created the promise,
147 							// and append a continuation which goes back to our thread
148 							// and resolves the returned promise.
149 							localPromise = new P;
150 							entry.ownerThread.runAsync({
151 								entry.realPromise.then((P.ValueTuple value) {
152 									entry.resolved = true;
153 									localThread.runAsync({
154 										localPromise.fulfill(value);
155 									});
156 								}, (error) {
157 									entry.resolved = true;
158 									localThread.runAsync({
159 										localPromise.reject(error);
160 									});
161 								});
162 							});
163 						}
164 					}
165 				});
166 			return localPromise;
167 		}
168 	}
169 	else
170 		static assert(false, "Not a function: " ~ __traits(identifier, fun));
171 }
172 
173 unittest
174 {
175 	Promise!void funImpl() { return resolve(); }
176 	alias fun = globallyMemoized!funImpl;
177 }
178 
179 // ****************************************************************************
180 
181 /// Runs tasks asynchronously in an ordered manner.
182 /// For each `put` call, return a `Promise` which
183 /// resolves to the given delegate's return value.
184 /// The `taskFun` is evaluated in a separate thread.
185 /// Unlike `threadAsync`, at most one task will execute
186 /// at any given time (per `AsyncQueue` instance),
187 /// they will be executed in the order of the `put` calls,
188 /// and the promises will be resolved in the main thread
189 /// in the same order.
190 final class AsyncQueue(T, E = Exception)
191 {
192 	this()
193 	{
194 		// Note: std.concurrency can't support daemon tasks
195 		anchor = new ThreadAnchor(No.daemon);
196 		tid = spawn(&threadFunc, thisTid);
197 	} ///
198 
199 	Promise!(T, E) put(T delegate() taskFun)
200 	{
201 		auto promise = new Promise!(T, E);
202 		tid.send(cast(immutable)Task(taskFun, promise, anchor));
203 		return promise;
204 	} ///
205 
206 	/// Close the queue. Must be called to free up resources
207 	/// (thread and message queue).
208 	void close()
209 	{
210 		tid.send(cast(immutable)EOF(anchor));
211 		anchor = null;
212 	}
213 
214 private:
215 	import std.concurrency : spawn, send, receive, Tid, thisTid;
216 
217 	ThreadAnchor anchor;
218 	Tid tid;
219 
220 	struct Task
221 	{
222 		T delegate() fun;
223 		Promise!(T, E) promise;
224 		ThreadAnchor anchor;
225 	}
226 	struct EOF
227 	{
228 		ThreadAnchor anchor;
229 	}
230 
231 	static void threadFunc(Tid _)
232 	{
233 		bool done;
234 		while (!done)
235 		{
236 			receive(
237 				(immutable Task immutableTask)
238 				{
239 					auto task = cast()immutableTask;
240 					try
241 					{
242 						auto result = task.fun().voidStruct;
243 						task.anchor.runAsync({
244 							task.promise.fulfill(result.tupleof);
245 						});
246 					}
247 					catch (E e)
248 						task.anchor.runAsync({
249 							task.promise.reject(e);
250 						});
251 				},
252 				(immutable EOF immutableEOF)
253 				{
254 					auto eof = cast()immutableEOF;
255 					eof.anchor.close();
256 					done = true;
257 				},
258 			);
259 		}
260 	}
261 }
262 
263 unittest
264 {
265 	import ae.net.asockets : socketManager;
266 
267 	int[] result;
268 	{
269 		auto queue = new AsyncQueue!void;
270 		scope(exit) queue.close();
271 		auto taskFun(int n) { return () { Thread.sleep(n.msecs); result ~= n; }; }
272 		queue.put(taskFun(200));
273 		queue.put(taskFun(100));
274 	}
275 	socketManager.loop();
276 	assert(result == [200, 100]);
277 }