1 /**
2  * async/await-like API for asynchronous tasks combining promises and
3  * fibers.
4  *
5  * License:
6  *   This Source Code Form is subject to the terms of
7  *   the Mozilla Public License, v. 2.0. If a copy of
8  *   the MPL was not distributed with this file, You
9  *   can obtain one at http://mozilla.org/MPL/2.0/.
10  *
11  * Authors:
12  *   Vladimir Panteleev <ae@cy.md>
13  */
14 
15 module ae.utils.promise.await;
16 
17 import core.thread : Fiber;
18 
19 import ae.net.asockets : socketManager;
20 import ae.utils.promise;
21 
22 enum defaultFiberSize = 64 * 1024;
23 
24 /// Evaluates `task` in a new fiber, and returns a promise which is
25 /// fulfilled when `task` exits.  `task` may use `await` to block on
26 /// other promises.
27 // TODO: is using lazy OK for this? https://issues.dlang.org/show_bug.cgi?id=23923
28 Promise!(T, E) async(T, E = Exception)(lazy T task, size_t size = defaultFiberSize)
29 if (!is(T == return))
30 {
31 	return async({ return task; }, size);
32 }
33 
34 /// ditto
35 Promise!(T, E) async(T, E = Exception)(T delegate() task, size_t size = defaultFiberSize)
36 if (!is(T == return))
37 {
38 	auto p = new Promise!T;
39 	auto f = new Fiber({
40 		try
41 			static if (is(T == void))
42 				task(), p.fulfill();
43 			else
44 				p.fulfill(task());
45 		catch (E e)
46 			p.reject(e);
47 	}, size);
48 	f.call();
49 	return p;
50 }
51 
52 /// ditto
53 Promise!(T, E) async(T, E = Exception)(T function() task)
54 if (!is(T == return))
55 {
56 	import std.functional : toDelegate;
57 	return async(task.toDelegate);
58 }
59 
60 /// Synchronously waits until the promise `p` is fulfilled.
61 /// Can only be called in a fiber.
62 T await(T, E)(Promise!(T, E) p)
63 {
64 	Promise!T.ValueTuple fiberValue;
65 	E fiberError;
66 
67 	auto f = Fiber.getThis();
68 	assert(f, "await called while not in a fiber");
69 	p.then((Promise!T.ValueTuple value) {
70 		fiberValue = value;
71 		f.call();
72 	}, (E error) {
73 		fiberError = error;
74 		f.call();
75 	});
76 	Fiber.yield();
77 	if (fiberError)
78 		throw fiberError;
79 	else
80 	{
81 		static if (!is(T == void))
82 			return fiberValue[0];
83 	}
84 }
85 
86 ///
87 debug(ae_unittest) unittest
88 {
89 	import ae.net.asockets : socketManager;
90 
91 	auto one = resolve(1);
92 	auto two = resolve(2);
93 
94 	int sum;
95 	async(one.await + two.await).then((value) {
96 		sum = value;
97 	});
98 	socketManager.loop();
99 	assert(sum == 3);
100 }
101 
102 debug(ae_unittest) unittest
103 {
104 	if (false)
105 	{
106 		async({}).await();
107 		async({}()).await();
108 	}
109 }
110 
111 /// Synchronously starts and event loop and waits for it to exit.
112 /// Assumes that the promise `p` is resolved during the event loop;
113 /// Propagates any return value or exception to the caller.
114 T awaitSync(T, E)(Promise!(T, E) p)
115 {
116 	bool completed;
117 	Promise!T.ValueTuple taskValue;
118 	E taskError;
119 
120 	p.then((Promise!T.ValueTuple value) {
121 		completed = true;
122 		taskValue = value;
123 	}, (E error) {
124 		completed = true;
125 		taskError = error;
126 	});
127 
128 	socketManager.loop();
129 
130 	assert(completed, "Event loop exited but the promise remained unresolved");
131 	if (taskError)
132 		throw taskError;
133 	else
134 	{
135 		static if (!is(T == void))
136 			return taskValue[0];
137 	}
138 }
139 
140 debug(ae_unittest) unittest
141 {
142 	if (false)
143 	{
144 		async({}).awaitSync();
145 		async({}()).awaitSync();
146 	}
147 }